You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:31 UTC
[15/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
deleted file mode 100644
index 6855e00..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ /dev/null
@@ -1,767 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
-import org.apache.flink.api.common.io.OutputFormat
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
-import org.apache.flink.core.fs.{FileSystem, Path}
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
-import org.apache.flink.streaming.util.serialization.SerializationSchema
-import org.apache.flink.util.Collector
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-class DataStream[T](javaStream: JavaStream[T]) {
-
- /**
- * Gets the underlying java DataStream object.
- */
- def getJavaStream: JavaStream[T] = javaStream
-
- /**
- * Returns the ID of the DataStream.
- *
- * @return ID of the DataStream
- */
- def getId = javaStream.getId
-
- /**
- * Returns the TypeInformation for the elements of this DataStream.
- */
- def getType(): TypeInformation[T] = javaStream.getType()
-
- /**
- * Sets the parallelism of this operation. This must be at least 1.
- */
- def setParallelism(parallelism: Int): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
- case _ =>
- throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " +
- "have " +
- "parallelism.")
- }
- this
- }
-
- /**
- * Returns the parallelism of this operation.
- */
- def getParallelism = javaStream.getParallelism
-
- /**
- * Returns the execution config.
- */
- def getExecutionConfig = javaStream.getExecutionConfig
-
- /**
- * Gets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return Name of the stream.
- */
- def getName : String = javaStream match {
- case stream : SingleOutputStreamOperator[T,_] => stream.getName
- case _ => throw new
- UnsupportedOperationException("Only supported for operators.")
- }
-
- /**
- * Sets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return The named operator
- */
- def name(name: String) : DataStream[T] = javaStream match {
- case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
- case _ => throw new UnsupportedOperationException("Only supported for operators.")
- this
- }
-
- /**
- * Turns off chaining for this operator so thread co-location will not be
- * used as an optimization. </p> Chaining can be turned off for the whole
- * job by [[StreamExecutionEnvironment.disableOperatorChaining()]]
- * however it is not advised for performance considerations.
- *
- */
- def disableChaining(): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
- case _ =>
- throw new UnsupportedOperationException("Only supported for operators.")
- }
- this
- }
-
- /**
- * Starts a new task chain beginning at this operator. This operator will
- * not be chained (thread co-located for increased performance) to any
- * previous tasks even if possible.
- *
- */
- def startNewChain(): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
- case _ =>
- throw new UnsupportedOperationException("Only supported for operators.")
- }
- this
- }
-
- /**
- * Isolates the operator in its own resource group. This will cause the
- * operator to grab as many task slots as its degree of parallelism. If
- * there are no free resources available, the job will fail to start.
- * All subsequent operators are assigned to the default resource group.
- *
- */
- def isolateResources(): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
- case _ =>
- throw new UnsupportedOperationException("Only supported for operators.")
- }
- this
- }
-
- /**
- * By default all operators in a streaming job share the same resource
- * group. Each resource group takes as many task manager slots as the
- * maximum parallelism operator in that group. By calling this method, this
- * operators starts a new resource group and all subsequent operators will
- * be added to this group unless specified otherwise. Please note that
- * local executions have by default as many available task slots as the
- * environment parallelism, so in order to start a new resource group the
- * degree of parallelism for the operators must be decreased from the
- * default.
- */
- def startNewResourceGroup(): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
- case _ =>
- throw new UnsupportedOperationException("Only supported for operators.")
- }
- this
- }
-
- /**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffer. By default the output buffers flush only when they are full.
- *
- * @param timeoutMillis
- * The maximum time between two output flushes.
- * @return The operator with buffer timeout set.
- */
- def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
- javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis);
- case _ =>
- throw new UnsupportedOperationException("Only supported for operators.")
- }
- this
- }
-
- /**
- * Creates a new DataStream by merging DataStream outputs of
- * the same type with each other. The DataStreams merged using this operator
- * will be transformed simultaneously.
- *
- */
- def union(dataStreams: DataStream[T]*): DataStream[T] =
- javaStream.union(dataStreams.map(_.getJavaStream): _*)
-
- /**
- * Creates a new ConnectedStreams by connecting
- * DataStream outputs of different type with each other. The
- * DataStreams connected using this operators can be used with CoFunctions.
- */
- def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
- javaStream.connect(dataStream.getJavaStream)
-
- /**
- * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = javaStream.keyBy(fields: _*)
-
- /**
- * Groups the elements of a DataStream by the given field expressions to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
- javaStream.keyBy(firstField +: otherFields.toArray: _*)
-
- /**
- * Groups the elements of a DataStream by the given K key to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
-
- val cleanFun = clean(fun)
- val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
-
- val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
- def getKey(in: T) = cleanFun(in)
- override def getProducedType: TypeInformation[K] = keyType
- }
- new JavaKeyedStream(javaStream, keyExtractor, keyType)
- }
-
- /**
- * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def partitionByHash(fields: Int*): DataStream[T] = javaStream.partitionByHash(fields: _*)
-
- /**
- * Groups the elements of a DataStream by the given field expressions to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
- javaStream.partitionByHash(firstField +: otherFields.toArray: _*)
-
- /**
- * Groups the elements of a DataStream by the given K key to
- * be used with grouped operators like grouped reduce or grouped aggregations.
- */
- def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
-
- val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
- def getKey(in: T) = cleanFun(in)
- override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
- }
- javaStream.partitionByHash(keyExtractor)
- }
-
- /**
- * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
- * This method takes the key position to partition on, and a partitioner that accepts the key
- * type.
- * <p>
- * Note: This method works only on single field keys.
- */
- def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
- javaStream.partitionCustom(partitioner, field)
-
- /**
- * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
- * This method takes the key expression to partition on, and a partitioner that accepts the key
- * type.
- * <p>
- * Note: This method works only on single field keys.
- */
- def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String)
- : DataStream[T] = javaStream.partitionCustom(partitioner, field)
-
- /**
- * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
- * This method takes the key selector to get the key to partition on, and a partitioner that
- * accepts the key type.
- * <p>
- * Note: This method works only on single field keys, i.e. the selector cannot return tuples
- * of fields.
- */
- def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
- : DataStream[T] = {
- val cleanFun = clean(fun)
- val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
- def getKey(in: T) = cleanFun(in)
- override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
- }
- javaStream.partitionCustom(partitioner, keyExtractor)
- }
-
- /**
- * Sets the partitioning of the DataStream so that the output tuples
- * are broad casted to every parallel instance of the next component. This
- * setting only effects the how the outputs will be distributed between the
- * parallel instances of the next processing operator.
- *
- */
- def broadcast: DataStream[T] = javaStream.broadcast()
-
- /**
- * Sets the partitioning of the DataStream so that the output values all go to
- * the first instance of the next processing operator. Use this setting with care
- * since it might cause a serious performance bottleneck in the application.
- */
- def global: DataStream[T] = javaStream.global()
-
- /**
- * Sets the partitioning of the DataStream so that the output tuples
- * are shuffled to the next component. This setting only effects the how the
- * outputs will be distributed between the parallel instances of the next
- * processing operator.
- *
- */
- def shuffle: DataStream[T] = javaStream.shuffle()
-
- /**
- * Sets the partitioning of the DataStream so that the output tuples
- * are forwarded to the local subtask of the next component (whenever
- * possible). This is the default partitioner setting. This setting only
- * effects the how the outputs will be distributed between the parallel
- * instances of the next processing operator.
- *
- */
- def forward: DataStream[T] = javaStream.forward()
-
- /**
- * Sets the partitioning of the DataStream so that the output tuples
- * are distributed evenly to the next component.This setting only effects
- * the how the outputs will be distributed between the parallel instances of
- * the next processing operator.
- *
- */
- def rebalance: DataStream[T] = javaStream.rebalance()
-
- /**
- * Initiates an iterative part of the program that creates a loop by feeding
- * back data streams. To create a streaming iteration the user needs to define
- * a transformation that creates two DataStreams. The first one is the output
- * that will be fed back to the start of the iteration and the second is the output
- * stream of the iterative part.
- * <p>
- * stepfunction: initialStream => (feedback, output)
- * <p>
- * A common pattern is to use output splitting to create feedback and output DataStream.
- * Please refer to the .split(...) method of the DataStream
- * <p>
- * By default a DataStream with iteration will never terminate, but the user
- * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
- * If no data received in the set time the stream terminates.
- * <p>
- * By default the feedback partitioning is set to match the input, to override this set
- * the keepPartitioning flag to true
- *
- */
- def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
- maxWaitTimeMillis:Long = 0,
- keepPartitioning: Boolean = false) : DataStream[R] = {
- val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
-
- val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
- iterativeStream.closeWith(feedback.getJavaStream)
- output
- }
-
- /**
- * Initiates an iterative part of the program that creates a loop by feeding
- * back data streams. To create a streaming iteration the user needs to define
- * a transformation that creates two DataStreams. The first one is the output
- * that will be fed back to the start of the iteration and the second is the output
- * stream of the iterative part.
- *
- * The input stream of the iterate operator and the feedback stream will be treated
- * as a ConnectedStreams where the the input is connected with the feedback stream.
- *
- * This allows the user to distinguish standard input from feedback inputs.
- *
- * <p>
- * stepfunction: initialStream => (feedback, output)
- * <p>
- * The user must set the max waiting time for the iteration head.
- * If no data received in the set time the stream terminates. If this parameter is set
- * to 0 then the iteration sources will indefinitely, so the job must be killed to stop.
- *
- */
- def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
- (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
- val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
- val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
- withFeedbackType(feedbackType)
-
- val (feedback, output) = stepFunction(connectedIterativeStream)
- connectedIterativeStream.closeWith(feedback.getJavaStream)
- output
- }
-
- /**
- * Creates a new DataStream by applying the given function to every element of this DataStream.
- */
- def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Map function must not be null.")
- }
- val cleanFun = clean(fun)
- val mapper = new MapFunction[T, R] {
- def map(in: T): R = cleanFun(in)
- }
-
- map(mapper)
- }
-
- /**
- * Creates a new DataStream by applying the given function to every element of this DataStream.
- */
- def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
- if (mapper == null) {
- throw new NullPointerException("Map function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Creates a new DataStream by applying the given function to every element and flattening
- * the results.
- */
- def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
- if (flatMapper == null) {
- throw new NullPointerException("FlatMap function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Creates a new DataStream by applying the given function to every element and flattening
- * the results.
- */
- def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("FlatMap function must not be null.")
- }
- val cleanFun = clean(fun)
- val flatMapper = new FlatMapFunction[T, R] {
- def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
- }
- flatMap(flatMapper)
- }
-
- /**
- * Creates a new DataStream by applying the given function to every element and flattening
- * the results.
- */
- def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("FlatMap function must not be null.")
- }
- val cleanFun = clean(fun)
- val flatMapper = new FlatMapFunction[T, R] {
- def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
- }
- flatMap(flatMapper)
- }
-
- /**
- * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
- */
- def filter(filter: FilterFunction[T]): DataStream[T] = {
- if (filter == null) {
- throw new NullPointerException("Filter function must not be null.")
- }
- javaStream.filter(filter)
- }
-
- /**
- * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
- */
- def filter(fun: T => Boolean): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Filter function must not be null.")
- }
- val cleanFun = clean(fun)
- val filter = new FilterFunction[T] {
- def filter(in: T) = cleanFun(in)
- }
- this.filter(filter)
- }
-
- /**
- * Windows this DataStream into tumbling time windows.
- *
- * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
- * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
- * set using
- * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
- *
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param size The size of the window.
- */
- def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
- val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- }
-
- /**
- * Windows this DataStream into sliding time windows.
- *
- * This is a shortcut for either `.window(SlidingTimeWindows.of(size, slide))` or
- * `.window(SlidingProcessingTimeWindows.of(size, slide))` depending on the time characteristic
- * set using
- * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
- *
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param size The size of the window.
- */
- def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
- val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- }
-
- /**
- * Windows this [[DataStream]] into sliding count windows.
- *
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param size The size of the windows in number of elements.
- * @param slide The slide interval in number of elements.
- */
- def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
- new AllWindowedStream(javaStream.countWindowAll(size, slide))
- }
-
- /**
- * Windows this [[DataStream]] into tumbling count windows.
- *
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param size The size of the windows in number of elements.
- */
- def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
- new AllWindowedStream(javaStream.countWindowAll(size))
- }
-
- /**
- * Windows this data stream to a [[AllWindowedStream]], which evaluates windows
- * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The grouping
- * of elements is done both by key and by window.
- *
- * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
- * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
- * that is used if a `Trigger` is not specified.
- *
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param assigner The `WindowAssigner` that assigns elements to windows.
- * @return The trigger windows data stream.
- */
- def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
- new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, assigner))
- }
- /**
- * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
- * The internal timestamps are, for example, used to to event-time window operations.
- *
- * If you know that the timestamps are strictly increasing you can use an
- * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise,
- * you should provide a [[TimestampExtractor]] that also implements
- * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
- javaStream.assignTimestamps(clean(extractor))
- }
-
- /**
- * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
- * The internal timestamps are, for example, used to to event-time window operations.
- *
- * If you know that the timestamps are strictly increasing you can use an
- * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise,
- * you should provide a [[TimestampExtractor]] that also implements
- * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
- val cleanExtractor = clean(extractor)
- val extractorFunction = new AscendingTimestampExtractor[T] {
- def extractAscendingTimestamp(element: T, currentTimestamp: Long): Long = {
- cleanExtractor(element)
- }
- }
- javaStream.assignTimestamps(extractorFunction)
- }
-
- /**
- *
- * Operator used for directing tuples to specific named outputs using an
- * OutputSelector. Calling this method on an operator creates a new
- * [[SplitStream]].
- */
- def split(selector: OutputSelector[T]): SplitStream[T] = javaStream.split(selector)
-
- /**
- * Creates a new [[SplitStream]] that contains only the elements satisfying the
- * given output selector predicate.
- */
- def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
- if (fun == null) {
- throw new NullPointerException("OutputSelector must not be null.")
- }
- val cleanFun = clean(fun)
- val selector = new OutputSelector[T] {
- def select(in: T): java.lang.Iterable[String] = {
- cleanFun(in).toIterable.asJava
- }
- }
- split(selector)
- }
-
- /**
- * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
- * and window can be specified.
- */
- def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams.Unspecified[T, T2] = {
- CoGroupedStreams.createCoGroup(this, otherStream)
- }
-
- /**
- * Creates a join operation. See [[JoinedStreams]] for an example of how the keys
- * and window can be specified.
- */
- def join[T2](otherStream: DataStream[T2]): JoinedStreams.Unspecified[T, T2] = {
- JoinedStreams.createJoin(this, otherStream)
- }
-
- /**
- * Writes a DataStream to the standard output stream (stdout). For each
- * element of the DataStream the result of .toString is
- * written.
- *
- */
- def print(): DataStreamSink[T] = javaStream.print()
-
- /**
- * Writes a DataStream to the standard output stream (stderr).
- *
- * For each element of the DataStream the result of
- * [[AnyRef.toString()]] is written.
- *
- * @return The closed DataStream.
- */
- def printToErr() = javaStream.printToErr()
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of .toString
- * is written.
- *
- */
- def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
- javaStream.writeAsText(path, millis)
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of .toString
- * is written.
- *
- */
- def writeAsCsv(
- path: String,
- millis: Long = 0,
- rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
- fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
- writeMode: FileSystem.WriteMode = null): DataStreamSink[T] = {
- require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
- val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter)
- if (writeMode != null) {
- of.setWriteMode(writeMode)
- }
- javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
- }
-
- /**
- * Writes a DataStream using the given [[OutputFormat]]. The
- * writing is performed periodically, in every millis milliseconds.
- */
- def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
- javaStream.write(format, millis)
- }
-
- /**
- * Writes the DataStream to a socket as a byte array. The format of the output is
- * specified by a [[SerializationSchema]].
- */
- def writeToSocket(
- hostname: String,
- port: Integer,
- schema: SerializationSchema[T, Array[Byte]]): DataStreamSink[T] = {
- javaStream.writeToSocket(hostname, port, schema)
- }
-
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the StreamExecutionEnvironment.execute(...)
- * method is called.
- *
- */
- def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] =
- javaStream.addSink(sinkFunction)
-
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the StreamExecutionEnvironment.execute(...)
- * method is called.
- *
- */
- def addSink(fun: T => Unit): DataStreamSink[T] = {
- if (fun == null) {
- throw new NullPointerException("Sink function must not be null.")
- }
- val cleanFun = clean(fun)
- val sinkFunction = new SinkFunction[T] {
- def invoke(in: T) = cleanFun(in)
- }
- this.addSink(sinkFunction)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
deleted file mode 100644
index c259724..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-/**
- * `JoinedStreams` represents two [[DataStream]]s that have been joined.
- * A streaming join operation is evaluated over elements in a window.
- *
- * To finalize the join operation you also need to specify a [[KeySelector]] for
- * both the first and second input and a [[WindowAssigner]]
- *
- * Note: Right now, the groups are being built in memory so you need to ensure that they don't
- * get too big. Otherwise the JVM might crash.
- *
- * Example:
- *
- * {{{
- * val one: DataStream[(String, Int)] = ...
- * val two: DataStream[(String, Int)] = ...
- *
- * val result = one.join(two)
- * .where {t => ... }
- * .equal {t => ... }
- * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- * .apply(new MyJoinFunction())
- * } }}}
- */
-object JoinedStreams {
-
- /**
- * A join operation that does not yet have its [[KeySelector]]s defined.
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- */
- class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
- /**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
- def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val keyType = implicitly[TypeInformation[KEY]]
- val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T1) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = keyType
- }
- new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
- }
-
- /**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
- def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val keyType = implicitly[TypeInformation[KEY]]
- val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T2) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = keyType
- }
- new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
- /**
- * A join operation that has [[KeySelector]]s defined for either both or
- * one input.
- *
- * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
- * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- * @tparam KEY Type of the key. This must be the same for both inputs
- */
- class WithKey[T1, T2, KEY](
- input1: DataStream[T1],
- input2: DataStream[T2],
- keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY],
- keyType: TypeInformation[KEY]) {
-
- /**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
- def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val localKeyType = keyType
- val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T1) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = localKeyType
- }
- new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, localKeyType)
- }
-
- /**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
- def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
- val cleanFun = clean(keySelector)
- val localKeyType = keyType
- val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
- def getKey(in: T2) = cleanFun(in)
- override def getProducedType: TypeInformation[KEY] = localKeyType
- }
- new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType)
- }
-
- /**
- * Specifies the window on which the join operation works.
- */
- def window[W <: Window](
- assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
- : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
- if (keySelector1 == null || keySelector2 == null) {
- throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
- "inputs using where() and equalTo().")
- }
- new JoinedStreams.WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- clean(assigner),
- null,
- null)
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
- /**
- * A join operation that has [[KeySelector]]s defined for both inputs as
- * well as a [[WindowAssigner]].
- *
- * @tparam T1 Type of the elements from the first input
- * @tparam T2 Type of the elements from the second input
- * @tparam KEY Type of the key. This must be the same for both inputs
- * @tparam W Type of { @link Window} on which the join operation works.
- */
- class WithWindow[T1, T2, KEY, W <: Window](
- input1: DataStream[T1],
- input2: DataStream[T2],
- keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY],
- windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
- trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
- evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
-
-
- /**
- * Sets the [[Trigger]] that should be used to trigger window emission.
- */
- def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
- : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
- new WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- windowAssigner,
- newTrigger,
- evictor)
- }
-
- /**
- * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
- *
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
- : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
- new WithWindow[T1, T2, KEY, W](
- input1,
- input2,
- keySelector1,
- keySelector2,
- windowAssigner,
- trigger,
- newEvictor)
- }
-
- /**
- * Completes the join operation with the user function that is executed
- * for windowed groups.
- */
- def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = {
- require(fun != null, "Join function must not be null.")
-
- val joiner = new FlatJoinFunction[T1, T2, O] {
- val cleanFun = clean(fun)
- def join(left: T1, right: T2, out: Collector[O]) = {
- out.collect(cleanFun(left, right))
- }
- }
- apply(joiner)
- }
-
- /**
- * Completes the join operation with the user function that is executed
- * for windowed groups.
- */
-
- def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = {
- require(fun != null, "Join function must not be null.")
-
- val joiner = new FlatJoinFunction[T1, T2, O] {
- val cleanFun = clean(fun)
- def join(left: T1, right: T2, out: Collector[O]) = {
- cleanFun(left, right, out)
- }
- }
- apply(joiner)
- }
-
- /**
- * Completes the join operation with the user function that is executed
- * for windowed groups.
- */
- def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
-
- val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
-
- join
- .where(keySelector1)
- .equalTo(keySelector2)
- .window(windowAssigner)
- .trigger(trigger)
- .evictor(evictor)
- .apply(clean(function), implicitly[TypeInformation[T]])
- }
-
- /**
- * Completes the join operation with the user function that is executed
- * for windowed groups.
- */
- def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
-
- val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
-
- join
- .where(keySelector1)
- .equalTo(keySelector2)
- .window(windowAssigner)
- .trigger(trigger)
- .evictor(evictor)
- .apply(clean(function), implicitly[TypeInformation[T]])
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
- }
- }
-
-
- /**
- * Creates a new join operation from the two given inputs.
- */
- def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
- : JoinedStreams.Unspecified[T1, T2] = {
- new JoinedStreams.Unspecified[T1, T2](input1, input2)
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
deleted file mode 100644
index 9f5c069..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-
-class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the type of the key by which this stream is keyed.
- */
- def getKeyType = javaStream.getKeyType()
-
- // ------------------------------------------------------------------------
- // Windowing
- // ------------------------------------------------------------------------
-
- /**
- * Windows this [[KeyedStream]] into tumbling time windows.
- *
- * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
- * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
- * set using
- * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
- *
- * @param size The size of the window.
- */
- def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
- val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- }
-
- /**
- * Windows this [[KeyedStream]] into sliding count windows.
- *
- * @param size The size of the windows in number of elements.
- * @param slide The slide interval in number of elements.
- */
- def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
- new WindowedStream(javaStream.countWindow(size, slide))
- }
-
- /**
- * Windows this [[KeyedStream]] into tumbling count windows.
- *
- * @param size The size of the windows in number of elements.
- */
- def countWindow(size: Long): WindowedStream[T, K, GlobalWindow] = {
- new WindowedStream(javaStream.countWindow(size))
- }
-
- /**
- * Windows this [[KeyedStream]] into sliding time windows.
- *
- * This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
- * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
- * set using
- * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
- *
- * @param size The size of the window.
- */
- def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = {
- val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- }
-
- /**
- * Windows this data stream to a [[WindowedStream]], which evaluates windows
- * over a key grouped stream. Elements are put into windows by a [[WindowAssigner]]. The
- * grouping of elements is done both by key and by window.
- *
- * A [[org.apache.flink.streaming.api.windowing.triggers.Trigger]] can be defined to specify
- * when windows are evaluated. However, `WindowAssigner` have a default `Trigger`
- * that is used if a `Trigger` is not specified.
- *
- * @param assigner The `WindowAssigner` that assigns elements to windows.
- * @return The trigger windows data stream.
- */
- def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
- new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
- }
-
- // ------------------------------------------------------------------------
- // Non-Windowed aggregation operations
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new [[DataStream]] by reducing the elements of this DataStream
- * using an associative reduce function. An independent aggregate is kept per key.
- */
- def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
- if (reducer == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
-
- javaStream.reduce(reducer)
- }
-
- /**
- * Creates a new [[DataStream]] by reducing the elements of this DataStream
- * using an associative reduce function. An independent aggregate is kept per key.
- */
- def reduce(fun: (T, T) => T): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- val cleanFun = clean(fun)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
- }
- reduce(reducer)
- }
-
- /**
- * Creates a new [[DataStream]] by folding the elements of this DataStream
- * using an associative fold function and an initial value. An independent
- * aggregate is kept per key.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
- DataStream[R] = {
- if (folder == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
- javaStream.fold(initialValue, folder).
- returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Creates a new [[DataStream]] by folding the elements of this DataStream
- * using an associative fold function and an initial value. An independent
- * aggregate is kept per key.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
- val cleanFun = clean(fun)
- val folder = new FoldFunction[T,R] {
- def fold(acc: R, v: T) = {
- cleanFun(acc, v)
- }
- }
- fold(initialValue, folder)
- }
-
- /**
- * Applies an aggregation that that gives the current maximum of the data stream at
- * the given position by the given key. An independent aggregate is kept per key.
- *
- */
- def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
- /**
- * Applies an aggregation that that gives the current maximum of the data stream at
- * the given field by the given key. An independent aggregate is kept per key.
- *
- */
- def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
- /**
- * Applies an aggregation that that gives the current minimum of the data stream at
- * the given position by the given key. An independent aggregate is kept per key.
- *
- */
- def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
- /**
- * Applies an aggregation that that gives the current minimum of the data stream at
- * the given field by the given key. An independent aggregate is kept per key.
- *
- */
- def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
- /**
- * Applies an aggregation that sums the data stream at the given position by the given
- * key. An independent aggregate is kept per key.
- *
- */
- def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
- /**
- * Applies an aggregation that sums the data stream at the given field by the given
- * key. An independent aggregate is kept per key.
- *
- */
- def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
- /**
- * Applies an aggregation that that gives the current minimum element of the data stream by
- * the given position by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the minimal value.
- *
- */
- def minBy(position: Int): DataStream[T] = aggregate(AggregationType
- .MINBY, position)
-
- /**
- * Applies an aggregation that that gives the current minimum element of the data stream by
- * the given field by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the minimal value.
- *
- */
- def minBy(field: String): DataStream[T] = aggregate(AggregationType
- .MINBY, field )
-
- /**
- * Applies an aggregation that that gives the current maximum element of the data stream by
- * the given position by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the maximal value.
- *
- */
- def maxBy(position: Int): DataStream[T] =
- aggregate(AggregationType.MAXBY, position)
-
- /**
- * Applies an aggregation that that gives the current maximum element of the data stream by
- * the given field by the given key. An independent aggregate is kept per key.
- * When equality, the first element is returned with the maximal value.
- *
- */
- def maxBy(field: String): DataStream[T] =
- aggregate(AggregationType.MAXBY, field)
-
- private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
- val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
- aggregate(aggregationType, position)
- }
-
- private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
- val reducer = aggregationType match {
- case AggregationType.SUM =>
- new SumAggregator(position, javaStream.getType, javaStream.getExecutionConfig)
- case _ =>
- new ComparableAggregator(position, javaStream.getType, aggregationType, true,
- javaStream.getExecutionConfig)
- }
-
- val invokable = new StreamGroupedReduce[T](reducer,
- getType().createSerializer(getExecutionConfig))
-
- new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
- .asInstanceOf[DataStream[T]]
- }
-
- // ------------------------------------------------------------------------
- // functions with state
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new DataStream that contains only the elements satisfying the given stateful filter
- * predicate. To use state partitioning, a key must be defined using .keyBy(..), in which case
- * an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def filterWithState[S : TypeInformation](
- fun: (T, Option[S]) => (Boolean, Option[S])): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Filter function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
-
- val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {
-
- override val stateType: TypeInformation[S] = stateTypeInfo
-
- override def filter(in: T): Boolean = {
- applyWithState(in, cleanFun)
- }
- }
-
- filter(filterFun)
- }
-
- /**
- * Creates a new DataStream by applying the given stateful function to every element of this
- * DataStream. To use state partitioning, a key must be defined using .keyBy(..), in which
- * case an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def mapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
- fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Map function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
-
- val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
-
- override val stateType: TypeInformation[S] = stateTypeInfo
-
- override def map(in: T): R = {
- applyWithState(in, cleanFun)
- }
- }
-
- map(mapper)
- }
-
- /**
- * Creates a new DataStream by applying the given stateful function to every element and
- * flattening the results. To use state partitioning, a key must be defined using .keyBy(..),
- * in which case an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def flatMapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
- fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Flatmap function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
-
- val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{
-
- override val stateType: TypeInformation[S] = stateTypeInfo
-
- override def flatMap(in: T, out: Collector[R]): Unit = {
- applyWithState(in, cleanFun) foreach out.collect
- }
- }
-
- flatMap(flatMapper)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
deleted file mode 100644
index deea6f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
-
-/**
- * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply a transformation on the whole output simply call
- * the appropriate method on this stream.
- *
- */
-class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
-
- /**
- * Sets the output names for which the next operator will receive values.
- */
- def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-
-}