You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/19 16:38:00 UTC
[3/7] flink git commit: [FLINK-1977] Rework Stream Operators to
always be push based
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index c78ec34..fd52624 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.flink.streaming.examples.windowing;
@@ -26,18 +26,17 @@ import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Random;
/**
- * An example of grouped stream windowing where different eviction and trigger
- * policies can be used. A source fetches events from cars every 1 sec
- * containing their id, their current speed (kmh), overall elapsed distance (m)
- * and a timestamp. The streaming example triggers the top speed of each car
- * every x meters elapsed for the last y seconds.
- */
+* An example of grouped stream windowing where different eviction and trigger
+* policies can be used. A source fetches events from cars every 1 sec
+* containing their id, their current speed (kmh), overall elapsed distance (m)
+* and a timestamp. The streaming example triggers the top speed of each car
+* every x meters elapsed for the last y seconds.
+*/
public class TopSpeedWindowingExample {
public static void main(String[] args) throws Exception {
@@ -75,8 +74,7 @@ public class TopSpeedWindowingExample {
env.execute("CarTopSpeedWindowingExample");
}
- private static class CarSource implements
- SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+ private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
private static final long serialVersionUID = 1L;
private Integer[] speeds;
@@ -84,6 +82,8 @@ public class TopSpeedWindowingExample {
private Random rand = new Random();
+ private int carId = 0;
+
private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
distances = new Double[numOfCars];
@@ -96,28 +96,27 @@ public class TopSpeedWindowingExample {
}
@Override
- public void run(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
- throws Exception {
-
- while (true) {
- Thread.sleep(1000);
- for (int carId = 0; carId < speeds.length; carId++) {
- if (rand.nextBoolean()) {
- speeds[carId] = Math.min(100, speeds[carId] + 5);
- } else {
- speeds[carId] = Math.max(0, speeds[carId] - 5);
- }
- distances[carId] += speeds[carId] / 3.6d;
- Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
- speeds[carId], distances[carId], System.currentTimeMillis());
- collector.collect(record);
- }
- }
+ public boolean reachedEnd() throws Exception {
+ return false;
}
@Override
- public void cancel() {
+ public Tuple4<Integer, Integer, Double, Long> next() throws Exception {
+ if (rand.nextBoolean()) {
+ speeds[carId] = Math.min(100, speeds[carId] + 5);
+ } else {
+ speeds[carId] = Math.max(0, speeds[carId] - 5);
+ }
+ distances[carId] += speeds[carId] / 3.6d;
+ Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
+ speeds[carId], distances[carId], System.currentTimeMillis());
+ carId++;
+ if (carId >= speeds.length) {
+ carId = 0;
+ }
+ return record;
}
+
}
private static class ParseCarData extends
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index 83945a9..4a6929e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -86,10 +86,10 @@ object StockPrices {
})
//Generate other stock streams
- val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
- val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
- val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
- val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+ val SPX_Stream = env.addSource(generateStock("SPX")(10))
+ val FTSE_Stream = env.addSource(generateStock("FTSE")(20))
+ val DJI_Stream = env.addSource(generateStock("DJI")(30))
+ val BUX_Stream = env.addSource(generateStock("BUX")(40))
//Merge all stock streams together
val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
@@ -118,7 +118,7 @@ object StockPrices {
//Step 4
//Read a stream of tweets and extract the stock symbols
- val tweetStream = env.addSource(generateTweets _)
+ val tweetStream = env.addSource(generateTweets)
val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
.map(_.toUpperCase())
@@ -183,25 +183,24 @@ object StockPrices {
}
}
- def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
+ def generateStock(symbol: String)(sigma: Int) = {
var price = 1000.0
- while (true) {
+ () =>
price = price + Random.nextGaussian * sigma
- out.collect(StockPrice(symbol, price))
Thread.sleep(Random.nextInt(200))
- }
+ StockPrice(symbol, price)
+
}
def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
num.toDouble(ts.sum) / ts.size
}
- def generateTweets(out: Collector[String]) = {
- while (true) {
+ def generateTweets = {
+ () =>
val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
- out.collect(s.mkString(" "))
Thread.sleep(Random.nextInt(500))
- }
+ s.mkString(" ")
}
private def parseParameters(args: Array[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
index d1fa9c6..7b72a23 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/TopSpeedWindowingExampleITCase.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.examples.test.windowing;
-import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowingExample;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2997f2d..08b2535 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,45 +18,25 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
- SingleOutputStreamOperator, GroupedDataStream}
-import org.apache.flink.streaming.util.serialization.SerializationSchema
-
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, FoldFunction, MapFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, GroupedDataStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.functions.aggregation.SumFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
-
-import scala.collection.JavaConversions._
-
-import java.util.HashMap
-
-import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamReduce
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce
-import org.apache.flink.streaming.api.operators.StreamFlatMap
-import org.apache.flink.streaming.api.operators.StreamGroupedFold
-import org.apache.flink.streaming.api.operators.StreamMap
-import org.apache.flink.streaming.api.operators.StreamFold
+import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
+import org.apache.flink.util.Collector
class DataStream[T](javaStream: JavaStream[T]) {
@@ -404,8 +384,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
aggregate(aggregationType, position)
}
- private def aggregate(aggregationType: AggregationType, position: Int):
- DataStream[T] = {
+ private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
val jStream = javaStream.asInstanceOf[JavaStream[Product]]
val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
@@ -421,12 +400,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
val invokable = jStream match {
- case groupedStream: GroupedDataStream[_] => new StreamGroupedReduce(reducer,
+ case groupedStream: GroupedDataStream[Product] => new StreamGroupedReduce[Product](reducer,
groupedStream.getKeySelector())
case _ => new StreamReduce(reducer)
}
- new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
- invokable)).asInstanceOf[DataStream[T]]
+ new DataStream[Product](jStream.transform("aggregation", jStream.getType(),invokable))
+ .asInstanceOf[DataStream[T]]
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index cbb5fb7..21b2e71 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,14 +18,18 @@
package org.apache.flink.streaming.api.scala
+import scala.reflect.ClassTag
+
import com.esotericsoftware.kryo.Serializer
+import org.apache.commons.lang.Validate
+import org.joda.time.Instant
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
-import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -295,14 +299,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* source functionality.
*
*/
- def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
+ def addSource[T: ClassTag: TypeInformation](function: () => T): DataStream[T] = {
require(function != null, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
val cleanFun = StreamExecutionEnvironment.clean(function)
- override def run(out: Collector[T]) {
- cleanFun(out)
- }
- override def cancel() = {}
+
+ override def reachedEnd(): Boolean = false
+
+ override def next(): T = cleanFun()
}
addSource(sourceFunction)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index fd70bfb..124a1fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -107,8 +106,8 @@ public class StreamCheckpointingITCase {
private Random rnd;
private StringBuilder stringBuilder;
+ private int index;
private int step;
-
private boolean running = true;
@Override
@@ -116,24 +115,26 @@ public class StreamCheckpointingITCase {
rnd = new Random();
stringBuilder = new StringBuilder();
step = getRuntimeContext().getNumberOfParallelSubtasks();
+ index = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
- public void run(Collector<String> collector) throws Exception {
- for (long i = getRuntimeContext().getIndexOfThisSubtask(); running && i < NUM_STRINGS; i += step) {
- char first = (char) ((i % 40) + 40);
-
- stringBuilder.setLength(0);
- stringBuilder.append(first);
-
- collector.collect(randomString(stringBuilder, rnd));
- }
+ public boolean reachedEnd() throws Exception {
+ return index >= NUM_STRINGS;
}
@Override
- public void cancel() {
- running = false;
+ public String next() throws Exception {
+ char first = (char) ((index % 40) + 40);
+
+ stringBuilder.setLength(0);
+ stringBuilder.append(first);
+
+ String result = randomString(stringBuilder, rnd);
+ index += step;
+ return result;
}
+
});
stream
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index c3a6314..627016c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
import java.io.BufferedReader;
import java.io.File;
@@ -143,8 +141,13 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
private final File coordinateDir;
private final long end;
-
+
+ private long toCollect;
private long collected;
+ private boolean checkForProceedFile;
+ private File proceedFile;
+ private long stepSize;
+ private long congruence;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -152,36 +155,39 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
@Override
- public void run(Collector<Long> collector) throws Exception {
-
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-
- final long stepSize = context.getNumberOfParallelSubtasks();
- final long congruence = context.getIndexOfThisSubtask();
- final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
-
- final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
- boolean checkForProceedFile = true;
-
- while (collected < toCollect) {
- // check if the proceed file exists (then we go full speed)
- // if not, we always recheck and sleep
- if (checkForProceedFile) {
- if (proceedFile.exists()) {
- checkForProceedFile = false;
- } else {
- // otherwise wait so that we make slow progress
- Thread.sleep(SLEEP_TIME);
- }
- }
+ @SuppressWarnings("unchecked")
+ public void open(Configuration config) {
+ stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+ congruence = getRuntimeContext().getIndexOfThisSubtask();
+ toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+ collected = 0L;
+
+ proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
+ checkForProceedFile = true;
+ }
- collector.collect(collected * stepSize + congruence);
- collected++;
- }
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return collected >= toCollect;
}
@Override
- public void cancel() {}
+ public Long next() throws Exception {
+ // check if the proceed file exists (then we go full speed)
+ // if not, we always recheck and sleep
+ if (checkForProceedFile) {
+ if (proceedFile.exists()) {
+ checkForProceedFile = false;
+ } else {
+ // otherwise wait so that we make slow progress
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ long result = collected * stepSize + congruence;
+ collected++;
+ return result;
+ }
@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {