You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:30 UTC
[14/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
deleted file mode 100644
index e953696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.Objects
-import java.util.Objects._
-
-import com.esotericsoftware.kryo.Serializer
-import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.runtime.state.StateBackend
-import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.types.StringValue
-import org.apache.flink.util.SplittableIterator
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import _root_.scala.language.implicitConversions
-
-class StreamExecutionEnvironment(javaEnv: JavaEnv) {
-
- /**
- * Gets the config object.
- */
- def getConfig = javaEnv.getConfig
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific operations using
- * [[DataStream#setParallelism(int)]].
- */
- def setParallelism(parallelism: Int): Unit = {
- javaEnv.setParallelism(parallelism)
- }
-
- /**
- * Returns the default parallelism for this execution environment. Note that this
- * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
- */
- def getParallelism = javaEnv.getParallelism
-
- /**
- * Sets the maximum time frequency (milliseconds) for the flushing of the
- * output buffers. By default the output buffers flush frequently to provide
- * low latency and to aid smooth developer experience. Setting the parameter
- * can result in three logical modes:
- *
- * <ul>
- * <li>A positive integer triggers flushing periodically by that integer</li>
- * <li>0 triggers flushing after every record thus minimizing latency</li>
- * <li>-1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
- * </ul>
- */
- def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
- javaEnv.setBufferTimeout(timeoutMillis)
- this
- }
-
- /**
- * Gets the default buffer timeout set for this environment
- */
- def getBufferTimeout = javaEnv.getBufferTimeout
-
- /**
- * Disables operator chaining for streaming operators. Operator chaining
- * allows non-shuffle operations to be co-located in the same thread fully
- * avoiding serialization and de-serialization.
- *
- */
- def disableOperatorChaining(): StreamExecutionEnvironment = {
- javaEnv.disableOperatorChaining()
- this
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing Settings
- // ------------------------------------------------------------------------
- /**
- * Enables checkpointing for the streaming job. The distributed state of the streaming
- * dataflow will be periodically snapshotted. In case of a failure, the streaming
- * dataflow will be restarted from the latest completed checkpoint.
- *
- * The job draws checkpoints periodically, in the given interval. The state will be
- * stored in the configured state backend.
- *
- * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
- * the moment. If the "force" parameter is set to true, the system will execute the
- * job nonetheless.
- *
- * @param interval
- * Time interval between state checkpoints in millis.
- * @param mode
- * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
- * @param force
- * If true checkpointing will be enabled for iterative jobs as well.
- */
- @deprecated
- def enableCheckpointing(interval : Long,
- mode: CheckpointingMode,
- force: Boolean) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval, mode, force)
- this
- }
-
- /**
- * Enables checkpointing for the streaming job. The distributed state of the streaming
- * dataflow will be periodically snapshotted. In case of a failure, the streaming
- * dataflow will be restarted from the latest completed checkpoint.
- *
- * The job draws checkpoints periodically, in the given interval. The system uses the
- * given [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once").
- * The state will be stored in the configured state backend.
- *
- * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
- * the moment. For that reason, iterative jobs will not be started if used
- * with enabled checkpointing. To override this mechanism, use the
- * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
- *
- * @param interval
- * Time interval between state checkpoints in milliseconds.
- * @param mode
- * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
- */
- def enableCheckpointing(interval : Long,
- mode: CheckpointingMode) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval, mode)
- this
- }
-
- /**
- * Enables checkpointing for the streaming job. The distributed state of the streaming
- * dataflow will be periodically snapshotted. In case of a failure, the streaming
- * dataflow will be restarted from the latest completed checkpoint.
- *
- * The job draws checkpoints periodically, in the given interval. The program will use
- * [[CheckpointingMode.EXACTLY_ONCE]] mode. The state will be stored in the
- * configured state backend.
- *
- * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
- * the moment. For that reason, iterative jobs will not be started if used
- * with enabled checkpointing. To override this mechanism, use the
- * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
- *
- * @param interval
- * Time interval between state checkpoints in milliseconds.
- */
- def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
- enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
- }
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
- * operator states. Time interval between state checkpoints is specified in in millis.
- *
- * Setting this option assumes that the job is used in production and thus if not stated
- * explicitly otherwise with calling with the
- * [[setNumberOfExecutionRetries(int)]] method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
- */
- def enableCheckpointing() : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing()
- this
- }
-
- def getCheckpointingMode = javaEnv.getCheckpointingMode()
-
- /**
- * Sets the state backend that describes how to store and checkpoint operator state.
- * It defines in what form the key/value state, accessible from operations on
- * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
- * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
- * functions (implementing the interface
- * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
- *
- * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
- * maintains the state in heap memory, as objects. It is lightweight without extra
- * dependencies, but can checkpoint only small states (some counters).
- *
- * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
- * stores checkpoints of the state (also maintained as heap objects) in files. When using
- * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
- * that state is not lost upon failures of individual nodes and that the entire streaming
- * program can be executed highly available and strongly consistent (assuming that Flink
- * is run in high-availability mode).
- */
- def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = {
- javaEnv.setStateBackend(backend)
- this
- }
-
- /**
- * Returns the state backend that defines how to store and checkpoint state.
- */
- def getStateBackend: StateBackend[_] = javaEnv.getStateBackend()
-
- /**
- * Sets the number of times that failed tasks are re-executed. A value of zero
- * effectively disables fault tolerance. A value of "-1" indicates that the system
- * default value (as defined in the configuration) should be used.
- */
- def setNumberOfExecutionRetries(numRetries: Int): Unit = {
- javaEnv.setNumberOfExecutionRetries(numRetries)
- }
-
- /**
- * Gets the number of times the system will try to re-execute failed tasks. A value
- * of "-1" indicates that the system default value (as defined in the configuration)
- * should be used.
- */
- def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
-
- /**
- * Sets the delay that failed tasks are re-executed. A value of
- * zero effectively disables fault tolerance. A value of "-1"
- * indicates that the system default value (as defined in the configuration)
- * should be used.
- */
- def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
- javaEnv.setExecutionRetryDelay(executionRetryDelay)
- }
-
- /**
- * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
- * A value of "-1" indicates that the system default value (as defined
- * in the configuration) should be used.
- */
- def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
-
- // --------------------------------------------------------------------------------------------
- // Registry for types and serializers
- // --------------------------------------------------------------------------------------------
- /**
- * Adds a new Kryo default serializer to the Runtime.
- * <p/>
- * Note that the serializer instance must be serializable (as defined by
- * java.io.Serializable), because it may be distributed to the worker nodes
- * by java serialization.
- *
- * @param type
- * The class of the types serialized with the given serializer.
- * @param serializer
- * The serializer to use.
- */
- def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
- `type`: Class[_],
- serializer: T)
- : Unit = {
- javaEnv.addDefaultKryoSerializer(`type`, serializer)
- }
-
- /**
- * Adds a new Kryo default serializer to the Runtime.
- *
- * @param type
- * The class of the types serialized with the given serializer.
- * @param serializerClass
- * The class of the serializer to use.
- */
- def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]) {
- javaEnv.addDefaultKryoSerializer(`type`, serializerClass)
- }
-
- /**
- * Registers the given type with the serializer at the [[KryoSerializer]].
- *
- * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
- * because it may be distributed to the worker nodes by java serialization.
- */
- def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
- clazz: Class[_],
- serializer: T)
- : Unit = {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serializer at the [[KryoSerializer]].
- */
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serialization stack. If the type is eventually
- * serialized as a POJO, then the type is registered with the POJO serializer. If the
- * type ends up being serialized with Kryo, then it will be registered at Kryo to make
- * sure that only tags are written.
- *
- */
- def registerType(typeClass: Class[_]) {
- javaEnv.registerType(typeClass)
- }
-
- // --------------------------------------------------------------------------------------------
- // Time characteristic
- // --------------------------------------------------------------------------------------------
- /**
- * Sets the time characteristic for all streams create from this environment, e.g., processing
- * time, event time, or ingestion time.
- *
- * If you set the characteristic to IngestionTime of EventTime this will set a default
- * watermark update interval of 200 ms. If this is not applicable for your application
- * you should change it using
- * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
- *
- * @param characteristic The time characteristic.
- */
- def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
- javaEnv.setStreamTimeCharacteristic(characteristic)
- }
-
- /**
- * Gets the time characteristic/
- *
- * @see #setStreamTimeCharacteristic
- *
- * @return The time characteristic.
- */
- def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
-
- // --------------------------------------------------------------------------------------------
- // Data stream creations
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new DataStream that contains a sequence of numbers. This source is a parallel source.
- * If you manually set the parallelism to `1` the emitted elements are in order.
- */
- def generateSequence(from: Long, to: Long): DataStream[Long] = {
- new DataStream[java.lang.Long](javaEnv.generateSequence(from, to))
- .asInstanceOf[DataStream[Long]]
- }
-
- /**
- * Creates a DataStream that contains the given elements. The elements must all be of the
- * same type.
- *
- * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a parallelism of one.
- */
- def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
- val typeInfo = implicitly[TypeInformation[T]]
- fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
- }
-
- /**
- * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
- * because the framework may move the elements into the cluster if needed.
- *
- * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a parallelism of one.
- */
- def fromCollection[T: ClassTag: TypeInformation](data: Seq[T]): DataStream[T] = {
- require(data != null, "Data must not be null.")
- val typeInfo = implicitly[TypeInformation[T]]
-
- javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
- }
-
- /**
- * Creates a DataStream from the given [[Iterator]].
- *
- * Note that this operation will result in a non-parallel data source, i.e. a data source with
- * a parallelism of one.
- */
- def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
- val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.fromCollection(data.asJava, typeInfo)
- }
-
- /**
- * Creates a DataStream from the given [[SplittableIterator]].
- */
- def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
- DataStream[T] = {
- val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.fromParallelCollection(data, typeInfo)
- }
-
- /**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
- */
- def readTextFile(filePath: String): DataStream[String] =
- javaEnv.readTextFile(filePath)
-
- /**
- * Creates a data stream that represents the Strings produced by reading the given file
- * line wise. The character set with the given name will be used to read the files.
- */
- def readTextFile(filePath: String, charsetName: String): DataStream[String] =
- javaEnv.readTextFile(filePath, charsetName)
-
- /**
- * Creates a data stream that represents the strings produced by reading the given file
- * line wise. This method is similar to the standard text file reader, but it produces
- * a data stream with mutable StringValue objects, rather than Java Strings.
- * StringValues can be used to tune implementations to be less object and garbage
- * collection heavy. The file will be read with the system's default character set.
- */
- def readTextFileWithValue(filePath: String): DataStream[StringValue] =
- javaEnv.readTextFileWithValue(filePath)
-
- /**
- * Creates a data stream that represents the strings produced by reading the given file
- * line wise. This method is similar to the standard text file reader, but it produces
- * a data stream with mutable StringValue objects, rather than Java Strings.
- * StringValues can be used to tune implementations to be less object and garbage
- * collection heavy. The boolean flag indicates whether to skip lines that cannot
- * be read with the given character set.
- */
- def readTextFileWithValue(filePath: String, charsetName : String, skipInvalidLines : Boolean):
- DataStream[StringValue] =
- javaEnv.readTextFileWithValue(filePath, charsetName, skipInvalidLines)
-
- /**
- * Reads the given file with the given input format. The file path should be passed
- * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
- */
- def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
- DataStream[T] =
- javaEnv.readFile(inputFormat, filePath)
-
- /**
- * Creates a data stream that represents the primitive type produced by reading the given file
- * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
- * "hdfs://host:port/file/path").
- */
- def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String,
- delimiter: String = "\n", typeClass: Class[T]): DataStream[T] =
- javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
-
- /**
- * Creates a DataStream that contains the contents of file created while
- * system watches the given path. The file will be read with the system's
- * default character set. The user can check the monitoring interval in milliseconds,
- * and the way file modifications are handled. By default it checks for only new files
- * every 100 milliseconds.
- *
- */
- def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType =
- WatchType.ONLY_NEW_FILES): DataStream[String] =
- javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
-
- /**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set. The maximum retry interval is specified in seconds, in case
- * of temporary service outage reconnection is initiated every second.
- */
- def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
- DataStream[String] =
- javaEnv.socketTextStream(hostname, port)
-
- /**
- * Generic method to create an input data stream with a specific input format.
- * Since all data streams need specific information about their types, this method needs to
- * determine the type of the data produced by the input format. It will attempt to determine the
- * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
- */
- def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
- javaEnv.createInput(inputFormat)
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality. By default sources have a parallelism of 1.
- * To enable parallel execution, the user defined source should implement
- * ParallelSourceFunction or extend RichParallelSourceFunction.
- * In these cases the resulting source will have the parallelism of the environment.
- * To change this afterwards call DataStreamSource.setParallelism(int)
- *
- */
- def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val cleanFun = scalaClean(function)
- val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.addSource(cleanFun).returns(typeInfo)
- }
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality.
- *
- */
- def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val sourceFunction = new SourceFunction[T] {
- val cleanFun = scalaClean(function)
- override def run(ctx: SourceContext[T]) {
- cleanFun(ctx)
- }
- override def cancel() = {}
- }
- addSource(sourceFunction)
- }
-
- /**
- * Triggers the program execution. The environment will execute all parts of
- * the program that have resulted in a "sink" operation. Sink operations are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with a generated
- * default name.
- *
- */
- def execute() = javaEnv.execute()
-
- /**
- * Triggers the program execution. The environment will execute all parts of
- * the program that have resulted in a "sink" operation. Sink operations are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with the provided name
- *
- */
- def execute(jobName: String) = javaEnv.execute(jobName)
-
- /**
- * Creates the plan with which the system will execute the program, and
- * returns it as a String using a JSON representation of the execution data
- * flow graph. Note that this needs to be called, before the plan is
- * executed.
- *
- */
- def getExecutionPlan = javaEnv.getExecutionPlan
-
- /**
- * Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job.
- *
- * @return The StreamGraph representing the transformations
- */
- def getStreamGraph = javaEnv.getStreamGraph
-
- /**
- * Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
- * @return The encased ExecutionEnvironment
- */
- def getWrappedStreamExecutionEnvironment = javaEnv
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
- */
- private[flink] def scalaClean[F <: AnyRef](f: F): F = {
- if (getConfig.isClosureCleanerEnabled) {
- ClosureCleaner.clean(f, true)
- } else {
- ClosureCleaner.ensureSerializable(f)
- }
- f
- }
-}
-
-object StreamExecutionEnvironment {
-
- /**
- * Sets the default parallelism that will be used for the local execution
- * environment created by [[createLocalEnvironment()]].
- *
- * @param parallelism
- * The parallelism to use as the default local parallelism.
- */
- def setDefaultLocalParallelism(parallelism: Int) : Unit =
- StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism)
-
- /**
- * Creates an execution environment that represents the context in which the program is
- * currently executed. If the program is invoked standalone, this method returns a local
- * execution environment. If the program is invoked from within the command line client
- * to be submitted to a cluster, this method returns the execution environment of this cluster.
- */
- def getExecutionEnvironment: StreamExecutionEnvironment = {
- new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
- }
-
- /**
- * Creates a local execution environment. The local execution environment will run the program in
- * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
- * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
- */
- def createLocalEnvironment(
- parallelism: Int = Runtime.getRuntime.availableProcessors()):
- StreamExecutionEnvironment = {
- new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
- }
-
- /**
- * Creates a remote execution environment. The remote environment sends (parts of) the program to
- * a cluster for execution. Note that all file paths used in the program must be accessible from
- * the cluster. The execution will use the cluster's default parallelism, unless the
- * parallelism is set explicitly via [[StreamExecutionEnvironment.setParallelism()]].
- *
- * @param host The host name or address of the master (JobManager),
- * where the program should be executed.
- * @param port The port of the master (JobManager), where the program should be executed.
- * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
- * program uses
- * user-defined functions, user-defined input formats, or any libraries,
- * those must be
- * provided in the JAR files.
- */
- def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
- StreamExecutionEnvironment = {
- new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
- }
-
- /**
- * Creates a remote execution environment. The remote environment sends (parts of) the program
- * to a cluster for execution. Note that all file paths used in the program must be accessible
- * from the cluster. The execution will use the specified parallelism.
- *
- * @param host The host name or address of the master (JobManager),
- * where the program should be executed.
- * @param port The port of the master (JobManager), where the program should be executed.
- * @param parallelism The parallelism to use during the execution.
- * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
- * program uses
- * user-defined functions, user-defined input formats, or any libraries,
- * those must be
- * provided in the JAR files.
- */
- def createRemoteEnvironment(
- host: String,
- port: Int,
- parallelism: Int,
- jarFiles: String*): StreamExecutionEnvironment = {
- val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
- javaEnv.setParallelism(parallelism)
- new StreamExecutionEnvironment(javaEnv)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
deleted file mode 100644
index 93b91ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-import scala.collection.JavaConverters._
-
-/**
- * A [[WindowedStream]] represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
- * is triggered based on a [[Trigger]].
- *
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * If an [[org.apache.flink.streaming.api.windowing.evictors.Evictor]] is specified it will
- * be used to evict elements from the window after evaluation was triggered by the [[Trigger]]
- * but before the actual evaluation of the window. When using an evictor window performance will
- * degrade significantly, since pre-aggregation of window results cannot be used.
- *
- * Note that the [[WindowedStream]] is purely and API construct, during runtime
- * the [[WindowedStream]] will be collapsed together with the
- * [[KeyedStream]] and the operation over the window into one single operation.
- *
- * @tparam T The type of elements in the stream.
- * @tparam K The type of the key by which elements are grouped.
- * @tparam W The type of [[Window]] that the
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
- * assigns the elements to.
- */
-class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
-
- /**
- * Sets the [[Trigger]] that should be used to trigger window emission.
- */
- def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
- javaStream.trigger(trigger)
- this
- }
-
- /**
- * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
- *
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
- javaStream.evictor(evictor)
- this
- }
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- *
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
- * interval, so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- def reduce(function: ReduceFunction[T]): DataStream[T] = {
- javaStream.reduce(clean(function))
- }
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- *
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
- * interval, so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- def reduce(function: (T, T) => T): DataStream[T] = {
- if (function == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- val cleanFun = clean(function)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
- }
- reduce(reducer)
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- def fold[R: TypeInformation: ClassTag](
- initialValue: R,
- function: FoldFunction[T,R]): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
-
- val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
- javaStream.fold(initialValue, function, resultType)
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Fold function must not be null.")
- }
- val cleanFun = clean(function)
- val folder = new FoldFunction[T,R] {
- def fold(acc: R, v: T) = {
- cleanFun(acc, v)
- }
- }
- fold(initialValue, folder)
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, W]): DataStream[R] = {
- javaStream.apply(clean(function), implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("WindowApply function must not be null.")
- }
-
- val cleanedFunction = clean(function)
- val applyFunction = new WindowFunction[T, R, K, W] {
- def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
- cleanedFunction(key, window, elements.asScala, out)
- }
- }
- javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- preAggregator: ReduceFunction[T],
- function: WindowFunction[T, R, K, W]): DataStream[R] = {
- javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- def apply[R: TypeInformation: ClassTag](
- preAggregator: (T, T) => T,
- function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
- if (function == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
- if (function == null) {
- throw new NullPointerException("WindowApply function must not be null.")
- }
-
- val cleanReducer = clean(preAggregator)
- val reducer = new ReduceFunction[T] {
- def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
- }
-
- val cleanApply = clean(function)
- val applyFunction = new WindowFunction[T, R, K, W] {
- def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
- cleanApply(key, window, elements.asScala, out)
- }
- }
- javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
- }
-
- // ------------------------------------------------------------------------
- // Aggregations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies an aggregation that that gives the maximum of the elements in the window at
- * the given position.
- */
- def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
- /**
- * Applies an aggregation that that gives the maximum of the elements in the window at
- * the given field.
- */
- def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
- /**
- * Applies an aggregation that that gives the minimum of the elements in the window at
- * the given position.
- */
- def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
- /**
- * Applies an aggregation that that gives the minimum of the elements in the window at
- * the given field.
- */
- def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
- /**
- * Applies an aggregation that sums the elements in the window at the given position.
- */
- def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
- /**
- * Applies an aggregation that sums the elements in the window at the given field.
- */
- def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
- /**
- * Applies an aggregation that that gives the maximum element of the window by
- * the given position. When equality, returns the first.
- */
- def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
- position)
-
- /**
- * Applies an aggregation that that gives the maximum element of the window by
- * the given field. When equality, returns the first.
- */
- def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
- field)
-
- /**
- * Applies an aggregation that that gives the minimum element of the window by
- * the given position. When equality, returns the first.
- */
- def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
- position)
-
- /**
- * Applies an aggregation that that gives the minimum element of the window by
- * the given field. When equality, returns the first.
- */
- def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
- field)
-
- private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
- val position = fieldNames2Indices(getInputType(), Array(field))(0)
- aggregate(aggregationType, position)
- }
-
- def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
- val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]]
-
- val reducer = aggregationType match {
- case AggregationType.SUM =>
- new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
-
- case _ =>
- new ComparableAggregator(
- position,
- jStream.getInputType,
- aggregationType,
- true,
- jStream.getExecutionEnvironment.getConfig)
- }
-
- new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
- }
-
- /**
- * Gets the output type.
- */
- private def getInputType(): TypeInformation[T] = javaStream.getInputType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
deleted file mode 100644
index d66cfdb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.function
-
-import org.apache.flink.api.common.functions.RichFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.state.OperatorState
-
-/**
- * Trait implementing the functionality necessary to apply stateful functions in
- * RichFunctions without exposing the OperatorStates to the user. The user should
- * call the applyWithState method in his own RichFunction implementation.
- */
-trait StatefulFunction[I, O, S] extends RichFunction {
-
- var state: OperatorState[S] = _
- val stateType: TypeInformation[S]
-
- def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
- val (o, s: Option[S]) = fun(in, Option(state.value()))
- s match {
- case Some(v) => state.update(v)
- case None => state.update(null.asInstanceOf[S])
- }
- o
- }
-
- override def open(c: Configuration) = {
- state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
deleted file mode 100644
index e668064..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api
-
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
-import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
-import language.implicitConversions
-
-package object scala {
- // We have this here so that we always have generated TypeInformationS when
- // using the Scala API
- implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
-
- implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
- new DataStream[R](javaStream)
-
- implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
- KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
-
- implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
- new SplitStream[R](javaStream)
-
- implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
- ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
-
- implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
- StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
-
- private[flink] def fieldNames2Indices(
- typeInfo: TypeInformation[_],
- fields: Array[String]): Array[Int] = {
- typeInfo match {
- case ti: CaseClassTypeInfo[_] =>
- val result = ti.getFieldIndices(fields)
-
- if (result.contains(-1)) {
- throw new IllegalArgumentException("Fields '" + fields.mkString(", ") +
- "' are not valid for '" + ti.toString + "'.")
- }
-
- result
-
- case _ =>
- throw new UnsupportedOperationException("Specifying fields by name is only" +
- "supported on Case Classes (for now).")
- }
- }
-
- def createTuple2TypeInformation[T1, T2](
- t1: TypeInformation[T1],
- t2: TypeInformation[T2]) : TypeInformation[(T1, T2)] =
- apiTupleCreator[T1, T2](t1, t2)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
deleted file mode 100644
index 0c60719..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //Strip the parentheses from the expected text like output
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
- .replaceAll("[\\\\(\\\\)]", ""), resultPath);
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
deleted file mode 100644
index a2a78b7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes failes most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
deleted file mode 100644
index 731222e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.StateTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class StatefulFunctionITCase extends StreamingProgramTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- StateTestPrograms.testStatefulFunctions();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
deleted file mode 100644
index 530ba67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
deleted file mode 100644
index 7da7bc3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.RichReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
-import org.apache.flink.streaming.runtime.operators.windowing._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- *
- * TODO: update once we have optimized aligned time windows operator for all-windows
- */
- @Ignore
- @Test
- def testFastTimeWindows(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .windowAll(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
-
- val window2 = source
- .keyBy(0)
- .windowAll(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
- }
-
- @Test
- def testNonEvicting(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .windowAll(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
- val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(
- winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
- val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
- }
-
- @Test
- def testEvicting(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .windowAll(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
- val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
- assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(1000))
- .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
- def apply(
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
- val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
- }
-
- @Test
- def testPreReduce(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(
- winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(
- winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
- }
-
-}
-
-// ------------------------------------------------------------------------
-// UDFs
-// ------------------------------------------------------------------------
-
-class DummyReducer extends RichReduceFunction[(String, Int)] {
- def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
- value1
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
deleted file mode 100644
index 3c1e9c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.mutable
-
-class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testCoGroup(): Unit = {
- CoGroupJoinITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val source1 = env.addSource(new SourceFunction[(String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
- ctx.collect(("a", 0))
- ctx.collect(("a", 1))
- ctx.collect(("a", 2))
- ctx.collect(("b", 3))
- ctx.collect(("b", 4))
- ctx.collect(("b", 5))
- ctx.collect(("a", 6))
- ctx.collect(("a", 7))
- ctx.collect(("a", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
-
- val source2 = env.addSource(new SourceFunction[(String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
- ctx.collect(("a", 0))
- ctx.collect(("a", 1))
- ctx.collect(("b", 3))
- ctx.collect(("c", 6))
- ctx.collect(("c", 7))
- ctx.collect(("c", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
-
- source1.coGroup(source2)
- .where(_._1)
- .equalTo(_._1)
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
- "F:" + first.mkString("") + " S:" + second.mkString("")
- }
- .addSink(new SinkFunction[String]() {
- def invoke(value: String) {
- CoGroupJoinITCase.testResults += value
- }
- })
-
- env.execute("CoGroup Test")
-
- val expectedResult = mutable.MutableList(
- "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
- "F:(b,3)(b,4)(b,5) S:(b,3)",
- "F:(a,6)(a,7)(a,8) S:",
- "F: S:(c,6)(c,7)(c,8)")
-
- assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
- }
-
- @Test
- def testJoin(): Unit = {
- CoGroupJoinITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
- ctx.collect(("a", "x", 0))
- ctx.collect(("a", "y", 1))
- ctx.collect(("a", "z", 2))
-
- ctx.collect(("b", "u", 3))
- ctx.collect(("b", "w", 5))
-
- ctx.collect(("a", "i", 6))
- ctx.collect(("a", "j", 7))
- ctx.collect(("a", "k", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
- val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
- ctx.collect(("a", "u", 0))
- ctx.collect(("a", "w", 1))
-
- ctx.collect(("b", "i", 3))
- ctx.collect(("b", "k", 5))
-
- ctx.collect(("a", "x", 6))
- ctx.collect(("a", "z", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
- source1.join(source2)
- .where(_._1)
- .equalTo(_._1)
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply( (l, r) => l.toString + ":" + r.toString)
- .addSink(new SinkFunction[String]() {
- def invoke(value: String) {
- CoGroupJoinITCase.testResults += value
- }
- })
-
- env.execute("Join Test")
-
- val expectedResult = mutable.MutableList(
- "(a,x,0):(a,u,0)",
- "(a,x,0):(a,w,1)",
- "(a,y,1):(a,u,0)",
- "(a,y,1):(a,w,1)",
- "(a,z,2):(a,u,0)",
- "(a,z,2):(a,w,1)",
- "(b,u,3):(b,i,3)",
- "(b,u,3):(b,k,5)",
- "(b,w,5):(b,i,3)",
- "(b,w,5):(b,k,5)",
- "(a,i,6):(a,x,6)",
- "(a,i,6):(a,z,8)",
- "(a,j,7):(a,x,6)",
- "(a,j,7):(a,z,8)",
- "(a,k,8):(a,x,6)",
- "(a,k,8):(a,z,8)")
-
- assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
- }
-
- @Test
- def testSelfJoin(): Unit = {
- CoGroupJoinITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
- ctx.collect(("a", "x", 0))
- ctx.collect(("a", "y", 1))
- ctx.collect(("a", "z", 2))
-
- ctx.collect(("b", "u", 3))
- ctx.collect(("b", "w", 5))
-
- ctx.collect(("a", "i", 6))
- ctx.collect(("a", "j", 7))
- ctx.collect(("a", "k", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
- source1.join(source1)
- .where(_._1)
- .equalTo(_._1)
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply( (l, r) => l.toString + ":" + r.toString)
- .addSink(new SinkFunction[String]() {
- def invoke(value: String) {
- CoGroupJoinITCase.testResults += value
- }
- })
-
- env.execute("Self-Join Test")
-
- val expectedResult = mutable.MutableList(
- "(a,x,0):(a,x,0)",
- "(a,x,0):(a,y,1)",
- "(a,x,0):(a,z,2)",
- "(a,y,1):(a,x,0)",
- "(a,y,1):(a,y,1)",
- "(a,y,1):(a,z,2)",
- "(a,z,2):(a,x,0)",
- "(a,z,2):(a,y,1)",
- "(a,z,2):(a,z,2)",
- "(b,u,3):(b,u,3)",
- "(b,u,3):(b,w,5)",
- "(b,w,5):(b,u,3)",
- "(b,w,5):(b,w,5)",
- "(a,i,6):(a,i,6)",
- "(a,i,6):(a,j,7)",
- "(a,i,6):(a,k,8)",
- "(a,j,7):(a,i,6)",
- "(a,j,7):(a,j,7)",
- "(a,j,7):(a,k,8)",
- "(a,k,8):(a,i,6)",
- "(a,k,8):(a,j,7)",
- "(a,k,8):(a,k,8)")
-
- assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
- }
-
-}
-
-
-object CoGroupJoinITCase {
- private var testResults: mutable.MutableList[String] = null
-
- private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
- def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
- element._2
- }
-
- def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
- element._2 - 1
- }
-
- def getCurrentWatermark: Long = {
- Long.MinValue
- }
- }
-
- private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] {
- def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = {
- element._3
- }
-
- def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
- element._3 - 1
- }
-
- def getCurrentWatermark: Long = {
- Long.MinValue
- }
- }
-}