You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/19 16:38:00 UTC

[3/7] flink git commit: [FLINK-1977] Rework Stream Operators to always be push based

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index c78ec34..fd52624 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -1,19 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.flink.streaming.examples.windowing;
 
@@ -26,18 +26,17 @@ import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
 
 import java.util.Arrays;
 import java.util.Random;
 
 /**
- * An example of grouped stream windowing where different eviction and trigger
- * policies can be used. A source fetches events from cars every 1 sec
- * containing their id, their current speed (kmh), overall elapsed distance (m)
- * and a timestamp. The streaming example triggers the top speed of each car
- * every x meters elapsed for the last y seconds.
- */
+* An example of grouped stream windowing where different eviction and trigger
+* policies can be used. A source fetches events from cars every 1 sec
+* containing their id, their current speed (kmh), overall elapsed distance (m)
+* and a timestamp. The streaming example triggers the top speed of each car
+* every x meters elapsed for the last y seconds.
+*/
 public class TopSpeedWindowingExample {
 
 	public static void main(String[] args) throws Exception {
@@ -75,8 +74,7 @@ public class TopSpeedWindowingExample {
 		env.execute("CarTopSpeedWindowingExample");
 	}
 
-	private static class CarSource implements
-			SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+	private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
 
 		private static final long serialVersionUID = 1L;
 		private Integer[] speeds;
@@ -84,6 +82,8 @@ public class TopSpeedWindowingExample {
 
 		private Random rand = new Random();
 
+		private int carId = 0;
+
 		private CarSource(int numOfCars) {
 			speeds = new Integer[numOfCars];
 			distances = new Double[numOfCars];
@@ -96,28 +96,27 @@ public class TopSpeedWindowingExample {
 		}
 
 		@Override
-		public void run(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
-				throws Exception {
-
-			while (true) {
-				Thread.sleep(1000);
-				for (int carId = 0; carId < speeds.length; carId++) {
-					if (rand.nextBoolean()) {
-						speeds[carId] = Math.min(100, speeds[carId] + 5);
-					} else {
-						speeds[carId] = Math.max(0, speeds[carId] - 5);
-					}
-					distances[carId] += speeds[carId] / 3.6d;
-					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
-							speeds[carId], distances[carId], System.currentTimeMillis());
-					collector.collect(record);
-				}
-			}
+		public boolean reachedEnd() throws Exception {
+			return false;
 		}
 
 		@Override
-		public void cancel() {
+		public Tuple4<Integer, Integer, Double, Long> next() throws Exception {
+			if (rand.nextBoolean()) {
+				speeds[carId] = Math.min(100, speeds[carId] + 5);
+			} else {
+				speeds[carId] = Math.max(0, speeds[carId] - 5);
+			}
+			distances[carId] += speeds[carId] / 3.6d;
+			Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
+					speeds[carId], distances[carId], System.currentTimeMillis());
+			carId++;
+			if (carId >= speeds.length) {
+				carId = 0;
+			}
+			return record;
 		}
+
 	}
 
 	private static class ParseCarData extends

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index 83945a9..4a6929e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -86,10 +86,10 @@ object StockPrices {
     })
 
     //Generate other stock streams
-    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
-    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
-    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
-    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+    val SPX_Stream = env.addSource(generateStock("SPX")(10))
+    val FTSE_Stream = env.addSource(generateStock("FTSE")(20))
+    val DJI_Stream = env.addSource(generateStock("DJI")(30))
+    val BUX_Stream = env.addSource(generateStock("BUX")(40))
 
     //Merge all stock streams together
     val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
@@ -118,7 +118,7 @@ object StockPrices {
     //Step 4 
     //Read a stream of tweets and extract the stock symbols
 
-    val tweetStream = env.addSource(generateTweets _)
+    val tweetStream = env.addSource(generateTweets)
 
     val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
       .map(_.toUpperCase())
@@ -183,25 +183,24 @@ object StockPrices {
     }
   }
 
-  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
+  def generateStock(symbol: String)(sigma: Int) = {
     var price = 1000.0
-    while (true) {
+    () =>
       price = price + Random.nextGaussian * sigma
-      out.collect(StockPrice(symbol, price))
       Thread.sleep(Random.nextInt(200))
-    }
+      StockPrice(symbol, price)
+
   }
 
   def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
     num.toDouble(ts.sum) / ts.size
   }
 
-  def generateTweets(out: Collector[String]) = {
-    while (true) {
+  def generateTweets = {
+    () =>
       val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
-      out.collect(s.mkString(" "))
       Thread.sleep(Random.nextInt(500))
-    }
+      s.mkString(" ")
   }
 
   private def parseParameters(args: Array[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
index d1fa9c6..7b72a23 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.examples.test.windowing;
 
-import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
 public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2997f2d..08b2535 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,45 +18,25 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
-  SingleOutputStreamOperator, GroupedDataStream}
-import org.apache.flink.streaming.util.serialization.SerializationSchema
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, FoldFunction, MapFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, GroupedDataStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.SumFunction
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
-
-import scala.collection.JavaConversions._
-
-import java.util.HashMap
-
-import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamReduce
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce
-import org.apache.flink.streaming.api.operators.StreamFlatMap
-import org.apache.flink.streaming.api.operators.StreamGroupedFold
-import org.apache.flink.streaming.api.operators.StreamMap
-import org.apache.flink.streaming.api.operators.StreamFold
+import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
+import org.apache.flink.util.Collector
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -404,8 +384,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     aggregate(aggregationType, position)
   }
 
-  private def aggregate(aggregationType: AggregationType, position: Int):
-    DataStream[T] = {
+  private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
 
     val jStream = javaStream.asInstanceOf[JavaStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -421,12 +400,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
 
     val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new StreamGroupedReduce(reducer,
+      case groupedStream: GroupedDataStream[Product] => new StreamGroupedReduce[Product](reducer,
         groupedStream.getKeySelector())
       case _ => new StreamReduce(reducer)
     }
-    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
-      invokable)).asInstanceOf[DataStream[T]]
+    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),invokable))
+      .asInstanceOf[DataStream[T]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index cbb5fb7..21b2e71 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,14 +18,18 @@
 
 package org.apache.flink.streaming.api.scala
 
+import scala.reflect.ClassTag
+
 import com.esotericsoftware.kryo.Serializer
+import org.apache.commons.lang.Validate
+import org.joda.time.Instant
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
-import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
@@ -295,14 +299,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * source functionality.
    *
    */
-  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
+  def addSource[T: ClassTag: TypeInformation](function: () => T): DataStream[T] = {
     require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def run(out: Collector[T]) {
-        cleanFun(out)
-      }
-      override def cancel() = {}
+
+      override def reachedEnd(): Boolean = false
+
+      override def next(): T = cleanFun()
     }
     addSource(sourceFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index fd70bfb..124a1fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -107,8 +106,8 @@ public class StreamCheckpointingITCase {
 				private Random rnd;
 				private StringBuilder stringBuilder;
 
+				private int index;
 				private int step;
-
 				private boolean running = true;
 
 				@Override
@@ -116,24 +115,26 @@ public class StreamCheckpointingITCase {
 					rnd = new Random();
 					stringBuilder = new StringBuilder();
 					step = getRuntimeContext().getNumberOfParallelSubtasks();
+					index = getRuntimeContext().getIndexOfThisSubtask();
 				}
 
 				@Override
-				public void run(Collector<String> collector) throws Exception {
-					for (long i = getRuntimeContext().getIndexOfThisSubtask(); running && i < NUM_STRINGS; i += step) {
-						char first = (char) ((i % 40) + 40);
-
-						stringBuilder.setLength(0);
-						stringBuilder.append(first);
-
-						collector.collect(randomString(stringBuilder, rnd));
-					}
+				public boolean reachedEnd() throws Exception {
+					return index >= NUM_STRINGS;
 				}
 
 				@Override
-				public void cancel() {
-					running = false;
+				public String next() throws Exception {
+					char first = (char) ((index % 40) + 40);
+
+					stringBuilder.setLength(0);
+					stringBuilder.append(first);
+
+					String result = randomString(stringBuilder, rnd);
+					index += step;
+					return result;
 				}
+
 			});
 
 			stream

http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index c3a6314..627016c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -143,8 +141,13 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 		private final File coordinateDir;
 		private final long end;
-		
+
+		private long toCollect;
 		private long collected;
+		private boolean checkForProceedFile;
+		private File proceedFile;
+		private long stepSize;
+		private long congruence;
 
 		public SleepyDurableGenerateSequence(File coordinateDir, long end) {
 			this.coordinateDir = coordinateDir;
@@ -152,36 +155,39 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 
 		@Override
-		public void run(Collector<Long> collector) throws Exception {
-
-			StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-
-			final long stepSize = context.getNumberOfParallelSubtasks();
-			final long congruence = context.getIndexOfThisSubtask();
-			final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
-
-			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
-			boolean checkForProceedFile = true;
-
-			while (collected < toCollect) {
-				// check if the proceed file exists (then we go full speed)
-				// if not, we always recheck and sleep
-				if (checkForProceedFile) {
-					if (proceedFile.exists()) {
-						checkForProceedFile = false;
-					} else {
-						// otherwise wait so that we make slow progress
-						Thread.sleep(SLEEP_TIME);
-					}
-				}
+		@SuppressWarnings("unchecked")
+		public void open(Configuration config) {
+			stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+			congruence = getRuntimeContext().getIndexOfThisSubtask();
+			toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+			collected = 0L;
+
+			proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
+			checkForProceedFile = true;
+		}
 
-				collector.collect(collected * stepSize + congruence);
-				collected++;
-			}
+		@Override
+		public boolean reachedEnd() throws Exception {
+			return collected >= toCollect;
 		}
 
 		@Override
-		public void cancel() {}
+		public Long next() throws Exception {
+			// check if the proceed file exists (then we go full speed)
+			// if not, we always recheck and sleep
+			if (checkForProceedFile) {
+				if (proceedFile.exists()) {
+					checkForProceedFile = false;
+				} else {
+					// otherwise wait so that we make slow progress
+					Thread.sleep(SLEEP_TIME);
+				}
+			}
+
+			long result = collected * stepSize + congruence;
+			collected++;
+			return result;
+		}
 
 		@Override
 		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {