You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:30 UTC

[14/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
deleted file mode 100644
index e953696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.Objects
-import java.util.Objects._
-
-import com.esotericsoftware.kryo.Serializer
-import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.runtime.state.StateBackend
-import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.types.StringValue
-import org.apache.flink.util.SplittableIterator
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import _root_.scala.language.implicitConversions
-
-class StreamExecutionEnvironment(javaEnv: JavaEnv) {
-
-  /**
-   * Gets the config object.
-   */
-  def getConfig = javaEnv.getConfig
-
-  /**
-   * Sets the parallelism for operations executed through this environment.
-   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
-   * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream#setParallelism(int)]].
-   */
-  def setParallelism(parallelism: Int): Unit = {
-    javaEnv.setParallelism(parallelism)
-  }
-
-  /**
-   * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
-   */
-  def getParallelism = javaEnv.getParallelism
-
-  /**
-   * Sets the maximum time frequency (milliseconds) for the flushing of the
-   * output buffers. By default the output buffers flush frequently to provide
-   * low latency and to aid smooth developer experience. Setting the parameter
-   * can result in three logical modes:
-   *
-   * <ul>
-   *   <li>A positive integer triggers flushing periodically by that integer</li>
-   *   <li>0 triggers flushing after every record thus minimizing latency</li>
-   *   <li>-1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
-   * </ul>
-   */
-  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
-    javaEnv.setBufferTimeout(timeoutMillis)
-    this
-  }
-
-  /**
-   * Gets the default buffer timeout set for this environment
-   */
-  def getBufferTimeout = javaEnv.getBufferTimeout
-
-  /**
-   * Disables operator chaining for streaming operators. Operator chaining
-   * allows non-shuffle operations to be co-located in the same thread fully
-   * avoiding serialization and de-serialization.
-   *
-   */
-  def disableOperatorChaining(): StreamExecutionEnvironment = {
-    javaEnv.disableOperatorChaining()
-    this
-  }
-
-  // ------------------------------------------------------------------------
-  //  Checkpointing Settings
-  // ------------------------------------------------------------------------
-  /**
-   * Enables checkpointing for the streaming job. The distributed state of the streaming
-   * dataflow will be periodically snapshotted. In case of a failure, the streaming
-   * dataflow will be restarted from the latest completed checkpoint.
-   *
-   * The job draws checkpoints periodically, in the given interval. The state will be
-   * stored in the configured state backend.
-   *
-   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
-   * the moment. If the "force" parameter is set to true, the system will execute the
-   * job nonetheless.
-   *
-   * @param interval
-   *     Time interval between state checkpoints in millis.
-   * @param mode
-   *     The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
-   * @param force
-   *           If true checkpointing will be enabled for iterative jobs as well.
-   */
-  @deprecated
-  def enableCheckpointing(interval : Long,
-                          mode: CheckpointingMode,
-                          force: Boolean) : StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing(interval, mode, force)
-    this
-  }
-
-  /**
-   * Enables checkpointing for the streaming job. The distributed state of the streaming
-   * dataflow will be periodically snapshotted. In case of a failure, the streaming
-   * dataflow will be restarted from the latest completed checkpoint.
-   *
-   * The job draws checkpoints periodically, in the given interval. The system uses the
-   * given [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once").
-   * The state will be stored in the configured state backend.
-   *
-   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
-   * the moment. For that reason, iterative jobs will not be started if used
-   * with enabled checkpointing. To override this mechanism, use the 
-   * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
-   *
-   * @param interval 
-   *     Time interval between state checkpoints in milliseconds.
-   * @param mode 
-   *     The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
-   */
-  def enableCheckpointing(interval : Long,
-                          mode: CheckpointingMode) : StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing(interval, mode)
-    this
-  }
-
-  /**
-   * Enables checkpointing for the streaming job. The distributed state of the streaming
-   * dataflow will be periodically snapshotted. In case of a failure, the streaming
-   * dataflow will be restarted from the latest completed checkpoint.
-   *
-   * The job draws checkpoints periodically, in the given interval. The program will use
-   * [[CheckpointingMode.EXACTLY_ONCE]] mode. The state will be stored in the
-   * configured state backend.
-   *
-   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
-   * the moment. For that reason, iterative jobs will not be started if used
-   * with enabled checkpointing. To override this mechanism, use the 
-   * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
-   *
-   * @param interval 
-   *           Time interval between state checkpoints in milliseconds.
-   */
-  def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
-    enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
-  }
-
-  /**
-   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
-   * operator states. Time interval between state checkpoints is specified in in millis.
-   *
-   * Setting this option assumes that the job is used in production and thus if not stated
-   * explicitly otherwise with calling with the
-   * [[setNumberOfExecutionRetries(int)]] method in case of
-   * failure the job will be resubmitted to the cluster indefinitely.
-   */
-  def enableCheckpointing() : StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing()
-    this
-  }
-  
-  def getCheckpointingMode = javaEnv.getCheckpointingMode()
-
-  /**
-   * Sets the state backend that describes how to store and checkpoint operator state.
-   * It defines in what form the key/value state, accessible from operations on
-   * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
-   * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
-   * functions (implementing the interface 
-   * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
-   *
-   * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
-   * maintains the state in heap memory, as objects. It is lightweight without extra 
-   * dependencies, but can checkpoint only small states (some counters).
-   *
-   * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
-   * stores checkpoints of the state (also maintained as heap objects) in files. When using
-   * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
-   * that state is not lost upon failures of individual nodes and that the entire streaming
-   * program can be executed highly available and strongly consistent (assuming that Flink
-   * is run in high-availability mode).
-   */
-  def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = {
-    javaEnv.setStateBackend(backend)
-    this
-  }
-
-  /**
-   * Returns the state backend that defines how to store and checkpoint state.
-   */
-  def getStateBackend: StateBackend[_] = javaEnv.getStateBackend()
-  
-  /**
-   * Sets the number of times that failed tasks are re-executed. A value of zero
-   * effectively disables fault tolerance. A value of "-1" indicates that the system
-   * default value (as defined in the configuration) should be used.
-   */
-  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
-    javaEnv.setNumberOfExecutionRetries(numRetries)
-  }
-
-  /**
-   * Gets the number of times the system will try to re-execute failed tasks. A value
-   * of "-1" indicates that the system default value (as defined in the configuration)
-   * should be used.
-   */
-  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
-
-  /**
-   * Sets the delay that failed tasks are re-executed. A value of
-   * zero effectively disables fault tolerance. A value of "-1"
-   * indicates that the system default value (as defined in the configuration)
-   * should be used.
-   */
-  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
-    javaEnv.setExecutionRetryDelay(executionRetryDelay)
-  }
-
-  /**
-   * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
-   * A value of "-1" indicates that the system default value (as defined
-   * in the configuration) should be used.
-   */
-  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
-
-  // --------------------------------------------------------------------------------------------
-  // Registry for types and serializers
-  // --------------------------------------------------------------------------------------------
-  /**
-   * Adds a new Kryo default serializer to the Runtime.
-   * <p/>
-   * Note that the serializer instance must be serializable (as defined by
-   * java.io.Serializable), because it may be distributed to the worker nodes
-   * by java serialization.
-   *
-   * @param type
-   * The class of the types serialized with the given serializer.
-   * @param serializer
-   * The serializer to use.
-   */
-  def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
-      `type`: Class[_],
-      serializer: T)
-    : Unit = {
-    javaEnv.addDefaultKryoSerializer(`type`, serializer)
-  }
-
-  /**
-   * Adds a new Kryo default serializer to the Runtime.
-   *
-   * @param type
-   * The class of the types serialized with the given serializer.
-   * @param serializerClass
-   * The class of the serializer to use.
-   */
-  def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]) {
-    javaEnv.addDefaultKryoSerializer(`type`, serializerClass)
-  }
-
-  /**
-   * Registers the given type with the serializer at the [[KryoSerializer]].
-   *
-   * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
-   * because it may be distributed to the worker nodes by java serialization.
-   */
-  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
-      clazz: Class[_],
-      serializer: T)
-    : Unit = {
-    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
-  }
-
-  /**
-   * Registers the given type with the serializer at the [[KryoSerializer]].
-   */
-  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
-    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
-  }
-
-  /**
-   * Registers the given type with the serialization stack. If the type is eventually
-   * serialized as a POJO, then the type is registered with the POJO serializer. If the
-   * type ends up being serialized with Kryo, then it will be registered at Kryo to make
-   * sure that only tags are written.
-   *
-   */
-  def registerType(typeClass: Class[_]) {
-    javaEnv.registerType(typeClass)
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Time characteristic
-  // --------------------------------------------------------------------------------------------
-  /**
-   * Sets the time characteristic for all streams create from this environment, e.g., processing
-   * time, event time, or ingestion time.
-   *
-   * If you set the characteristic to IngestionTime of EventTime this will set a default
-   * watermark update interval of 200 ms. If this is not applicable for your application
-   * you should change it using
-   * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
-   *
-   * @param characteristic The time characteristic.
-   */
-  def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
-    javaEnv.setStreamTimeCharacteristic(characteristic)
-  }
-
-  /**
-   * Gets the time characteristic/
-   *
-   * @see #setStreamTimeCharacteristic
-   *
-   * @return The time characteristic.
-   */
-  def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
-
-  // --------------------------------------------------------------------------------------------
-  // Data stream creations
-  // --------------------------------------------------------------------------------------------
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers. This source is a parallel source.
-   * If you manually set the parallelism to `1` the emitted elements are in order.
-   */
-  def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to))
-      .asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a DataStream that contains the given elements. The elements must all be of the
-   * same type.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
-   * because the framework may move the elements into the cluster if needed.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromCollection[T: ClassTag: TypeInformation](data: Seq[T]): DataStream[T] = {
-    require(data != null, "Data must not be null.")
-    val typeInfo = implicitly[TypeInformation[T]]
-
-    javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given [[Iterator]].
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a parallelism of one.
-   */
-  def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromCollection(data.asJava, typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given [[SplittableIterator]].
-   */
-  def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
-  DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.fromParallelCollection(data, typeInfo)
-  }
-
-  /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise. The file will be read with the system's default
-   * character set.
-   *
-   */
-  def readTextFile(filePath: String): DataStream[String] =
-    javaEnv.readTextFile(filePath)
-
-  /**
-   * Creates a data stream that represents the Strings produced by reading the given file
-   * line wise. The character set with the given name will be used to read the files.
-   */
-  def readTextFile(filePath: String, charsetName: String): DataStream[String] =
-    javaEnv.readTextFile(filePath, charsetName)
-
-  /**
-   * Creates a data stream that represents the strings produced by reading the given file
-   * line wise. This method is similar to the standard text file reader, but it produces
-   * a data stream with mutable StringValue objects, rather than Java Strings.
-   * StringValues can be used to tune implementations to be less object and garbage
-   * collection heavy. The file will be read with the system's default character set.
-   */
-  def readTextFileWithValue(filePath: String): DataStream[StringValue] =
-      javaEnv.readTextFileWithValue(filePath)
-
-  /**
-   * Creates a data stream that represents the strings produced by reading the given file
-   * line wise. This method is similar to the standard text file reader, but it produces
-   * a data stream with mutable StringValue objects, rather than Java Strings.
-   * StringValues can be used to tune implementations to be less object and garbage
-   * collection heavy. The boolean flag indicates whether to skip lines that cannot
-   * be read with the given character set.
-   */
-  def readTextFileWithValue(filePath: String, charsetName : String, skipInvalidLines : Boolean):
-    DataStream[StringValue] =
-    javaEnv.readTextFileWithValue(filePath, charsetName, skipInvalidLines)
-
-  /**
-   * Reads the given file with the given input format. The file path should be passed
-   * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
-   */
-  def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
-    DataStream[T] =
-    javaEnv.readFile(inputFormat, filePath)
-
-  /**
-   * Creates a data stream that represents the primitive type produced by reading the given file
-   * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
-   * "hdfs://host:port/file/path").
-   */
-  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String,
-    delimiter: String = "\n", typeClass: Class[T]): DataStream[T] =
-    javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
-
-  /**
-   * Creates a DataStream that contains the contents of file created while
-   * system watches the given path. The file will be read with the system's
-   * default character set. The user can check the monitoring interval in milliseconds,
-   * and the way file modifications are handled. By default it checks for only new files
-   * every 100 milliseconds.
-   *
-   */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = 
-    WatchType.ONLY_NEW_FILES): DataStream[String] =
-    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
-
-  /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set. The maximum retry interval is specified in seconds, in case
-   * of temporary service outage reconnection is initiated every second.
-   */
-  def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
-    DataStream[String] =
-    javaEnv.socketTextStream(hostname, port)
-
-  /**
-   * Generic method to create an input data stream with a specific input format.
-   * Since all data streams need specific information about their types, this method needs to
-   * determine the type of the data produced by the input format. It will attempt to determine the
-   * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
-   */
-  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
-    javaEnv.createInput(inputFormat)
-
-  /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality. By default sources have a parallelism of 1. 
-   * To enable parallel execution, the user defined source should implement 
-   * ParallelSourceFunction or extend RichParallelSourceFunction. 
-   * In these cases the resulting source will have the parallelism of the environment. 
-   * To change this afterwards call DataStreamSource.setParallelism(int)
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
-    require(function != null, "Function must not be null.")
-    val cleanFun = scalaClean(function)
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.addSource(cleanFun).returns(typeInfo)
-  }
-
-  /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
-    require(function != null, "Function must not be null.")
-    val sourceFunction = new SourceFunction[T] {
-      val cleanFun = scalaClean(function)
-      override def run(ctx: SourceContext[T]) {
-        cleanFun(ctx)
-      }
-      override def cancel() = {}
-    }
-    addSource(sourceFunction)
-  }
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with a generated
-   * default name.
-   *
-   */
-  def execute() = javaEnv.execute()
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with the provided name
-   *
-   */
-  def execute(jobName: String) = javaEnv.execute(jobName)
-
-  /**
-   * Creates the plan with which the system will execute the program, and
-   * returns it as a String using a JSON representation of the execution data
-   * flow graph. Note that this needs to be called, before the plan is
-   * executed.
-   *
-   */
-  def getExecutionPlan = javaEnv.getExecutionPlan
-
-  /**
-   * Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job.
-   *
-   * @return The StreamGraph representing the transformations
-   */
-  def getStreamGraph = javaEnv.getStreamGraph
-
-  /**
-   * Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
-   * @return The encased ExecutionEnvironment
-   */
-  def getWrappedStreamExecutionEnvironment = javaEnv
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
-   */
-  private[flink] def scalaClean[F <: AnyRef](f: F): F = {
-    if (getConfig.isClosureCleanerEnabled) {
-      ClosureCleaner.clean(f, true)
-    } else {
-      ClosureCleaner.ensureSerializable(f)
-    }
-    f
-  }
-}
-
-object StreamExecutionEnvironment {
-
-  /**
-   * Sets the default parallelism that will be used for the local execution
-   * environment created by [[createLocalEnvironment()]].
-   *
-   * @param parallelism
-   * The parallelism to use as the default local parallelism.
-   */
-  def setDefaultLocalParallelism(parallelism: Int) : Unit =
-    StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism)
-
-  /**
-   * Creates an execution environment that represents the context in which the program is
-   * currently executed. If the program is invoked standalone, this method returns a local
-   * execution environment. If the program is invoked from within the command line client
-   * to be submitted to a cluster, this method returns the execution environment of this cluster.
-   */
-  def getExecutionEnvironment: StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
-  }
-
-  /**
-   * Creates a local execution environment. The local execution environment will run the program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
-   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
-   */
-  def createLocalEnvironment(
-    parallelism: Int =  Runtime.getRuntime.availableProcessors()):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the program to
-   * a cluster for execution. Note that all file paths used in the program must be accessible from
-   * the cluster. The execution will use the cluster's default parallelism, unless the
-   * parallelism is set explicitly via [[StreamExecutionEnvironment.setParallelism()]].
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the program
-   * to a cluster for execution. Note that all file paths used in the program must be accessible
-   * from the cluster. The execution will use the specified parallelism.
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param parallelism The parallelism to use during the execution.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(
-    host: String,
-    port: Int,
-    parallelism: Int,
-    jarFiles: String*): StreamExecutionEnvironment = {
-    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
-    javaEnv.setParallelism(parallelism)
-    new StreamExecutionEnvironment(javaEnv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
deleted file mode 100644
index 93b91ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-import scala.collection.JavaConverters._
-
-/**
- * A [[WindowedStream]] represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
- * is triggered based on a [[Trigger]].
- *
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * If an [[org.apache.flink.streaming.api.windowing.evictors.Evictor]] is specified it will
- * be used to evict elements from the window after evaluation was triggered by the [[Trigger]]
- * but before the actual evaluation of the window. When using an evictor window performance will
- * degrade significantly, since pre-aggregation of window results cannot be used.
- *
- * Note that the [[WindowedStream]] is purely and API construct, during runtime
- * the [[WindowedStream]] will be collapsed together with the
- * [[KeyedStream]] and the operation over the window into one single operation.
- *
- * @tparam T The type of elements in the stream.
- * @tparam K The type of the key by which elements are grouped.
- * @tparam W The type of [[Window]] that the
- *           [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
- *           assigns the elements to.
- */
-class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
-
-  /**
-   * Sets the [[Trigger]] that should be used to trigger window emission.
-   */
-  def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
-    javaStream.trigger(trigger)
-    this
-  }
-
-  /**
-   * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
-   *
-   * Note: When using an evictor window performance will degrade significantly, since
-   * pre-aggregation of window results cannot be used.
-   */
-  def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
-    javaStream.evictor(evictor)
-    this
-  }
-
-  // ------------------------------------------------------------------------
-  //  Operations on the keyed windows
-  // ------------------------------------------------------------------------
-
-  /**
-   * Applies a reduce function to the window. The window function is called for each evaluation
-   * of the window for each key individually. The output of the reduce function is interpreted
-   * as a regular non-windowed stream.
-   *
-   * This window will try and pre-aggregate data as much as the window policies permit. For example,
-   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
-   * interval, so a few elements are stored per key (one per slide interval).
-   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-   * aggregation tree.
-   *
-   * @param function The reduce function.
-   * @return The data stream that is the result of applying the reduce function to the window.
-   */
-  def reduce(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduce(clean(function))
-  }
-
-  /**
-   * Applies a reduce function to the window. The window function is called for each evaluation
-   * of the window for each key individually. The output of the reduce function is interpreted
-   * as a regular non-windowed stream.
-   *
-   * This window will try and pre-aggregate data as much as the window policies permit. For example,
-   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
-   * interval, so a few elements are stored per key (one per slide interval).
-   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-   * aggregation tree.
-   *
-   * @param function The reduce function.
-   * @return The data stream that is the result of applying the reduce function to the window.
-   */
-  def reduce(function: (T, T) => T): DataStream[T] = {
-    if (function == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val cleanFun = clean(function)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies the given fold function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the reduce function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * @param function The fold function.
-   * @return The data stream that is the result of applying the fold function to the window.
-   */
-  def fold[R: TypeInformation: ClassTag](
-      initialValue: R,
-      function: FoldFunction[T,R]): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-
-    val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
-    javaStream.fold(initialValue, function, resultType)
-  }
-
-  /**
-   * Applies the given fold function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the reduce function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * @param function The fold function.
-   * @return The data stream that is the result of applying the fold function to the window.
-   */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    val cleanFun = clean(function)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = {
-        cleanFun(acc, v)
-      }
-    }
-    fold(initialValue, folder)
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Not that this function requires that all data in the windows is buffered until the window
-   * is evaluated, as the function provides no means of pre-aggregation.
-   *
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, W]): DataStream[R] = {
-    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Not that this function requires that all data in the windows is buffered until the window
-   * is evaluated, as the function provides no means of pre-aggregation.
-   *
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("WindowApply function must not be null.")
-    }
-
-    val cleanedFunction = clean(function)
-    val applyFunction = new WindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(key, window, elements.asScala, out)
-      }
-    }
-    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-   *
-   * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      preAggregator: ReduceFunction[T],
-      function: WindowFunction[T, R, K, W]): DataStream[R] = {
-    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-   *
-   * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      preAggregator: (T, T) => T,
-      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    if (function == null) {
-      throw new NullPointerException("WindowApply function must not be null.")
-    }
-
-    val cleanReducer = clean(preAggregator)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new WindowFunction[T, R, K, W] {
-      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanApply(key, window, elements.asScala, out)
-      }
-    }
-    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
-  }
-
-  // ------------------------------------------------------------------------
-  //  Aggregations on the keyed windows
-  // ------------------------------------------------------------------------
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given position.
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given field.
-   */
-  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given position.
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given field.
-   */
-  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given position.
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given field.
-   */
-  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given position. When equality, returns the first.
-   */
-  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
-    position)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given field. When equality, returns the first.
-   */
-  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
-    field)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given position. When equality, returns the first.
-   */
-  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
-    position)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given field. When equality, returns the first.
-   */
-  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
-    field)
-
-  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
-    val position = fieldNames2Indices(getInputType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }
-
-  def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaWStream[Product, K, W]]
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM =>
-        new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
-
-      case _ =>
-        new ComparableAggregator(
-          position,
-          jStream.getInputType,
-          aggregationType,
-          true,
-          jStream.getExecutionEnvironment.getConfig)
-    }
-
-    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
-  }
-
-  // ------------------------------------------------------------------------
-  //  Utilities
-  // ------------------------------------------------------------------------
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
-  }
-
-  /**
-   * Gets the output type.
-   */
-  private def getInputType(): TypeInformation[T] = javaStream.getInputType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
deleted file mode 100644
index d66cfdb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.function
-
-import org.apache.flink.api.common.functions.RichFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.state.OperatorState
-
-/**
- * Trait implementing the functionality necessary to apply stateful functions in 
- * RichFunctions without exposing the OperatorStates to the user. The user should
- * call the applyWithState method in his own RichFunction implementation.
- */
-trait StatefulFunction[I, O, S] extends RichFunction {
-  
-  var state: OperatorState[S] = _
-  val stateType: TypeInformation[S]
-
-  def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
-    val (o, s: Option[S]) = fun(in, Option(state.value()))
-    s match {
-      case Some(v) => state.update(v)
-      case None => state.update(null.asInstanceOf[S])
-    }
-    o
-  }
-
-  override def open(c: Configuration) = {
-    state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
deleted file mode 100644
index e668064..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api
-
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
-import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
-import language.implicitConversions
-
-package object scala {
-  // We have this here so that we always have generated TypeInformationS when
-  // using the Scala API
-  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
-
-  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
-    new DataStream[R](javaStream)
-    
-  implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
-  KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
-
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
-    new SplitStream[R](javaStream)
-
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
-  ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
-
-  implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
-    StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
-
-  private[flink] def fieldNames2Indices(
-      typeInfo: TypeInformation[_],
-      fields: Array[String]): Array[Int] = {
-    typeInfo match {
-      case ti: CaseClassTypeInfo[_] =>
-        val result = ti.getFieldIndices(fields)
-
-        if (result.contains(-1)) {
-          throw new IllegalArgumentException("Fields '" + fields.mkString(", ") +
-            "' are not valid for '" + ti.toString + "'.")
-        }
-
-        result
-
-      case _ =>
-        throw new UnsupportedOperationException("Specifying fields by name is only" +
-          "supported on Case Classes (for now).")
-    }
-  }
-
-  def createTuple2TypeInformation[T1, T2](
-      t1: TypeInformation[T1],
-      t2: TypeInformation[T2]) : TypeInformation[(T1, T2)] =
-    apiTupleCreator[T1, T2](t1, t2)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
deleted file mode 100644
index 0c60719..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		//Strip the parentheses from the expected text like output
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
-				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
-	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
deleted file mode 100644
index a2a78b7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes failes most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
-		@Override
-		protected void testProgram() throws Exception {
-			OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
-		}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
deleted file mode 100644
index 731222e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.StateTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class StatefulFunctionITCase extends StreamingProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		StateTestPrograms.testStatefulFunctions();
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
deleted file mode 100644
index 530ba67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
deleted file mode 100644
index 7da7bc3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.RichReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
-import org.apache.flink.streaming.runtime.operators.windowing._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
-  /**
-   * These tests ensure that the fast aligned time windows operator is used if the
-   * conditions are right.
-   *
-   * TODO: update once we have optimized aligned time windows operator for all-windows
-   */
-  @Ignore
-  @Test
-  def testFastTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .windowAll(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
-
-    val window2 = source
-      .keyBy(0)
-      .windowAll(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-      def apply(
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
-  }
-
-  @Test
-  def testNonEvicting(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .windowAll(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(
-      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-      def apply(
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-  }
-
-  @Test
-  def testEvicting(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .windowAll(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .evictor(CountEvictor.of(1000))
-      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
-      def apply(
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-  }
-
-  @Test
-  def testPreReduce(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(
-      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(
-      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-  }
-
-}
-
-// ------------------------------------------------------------------------
-//  UDFs
-// ------------------------------------------------------------------------
-
-class DummyReducer extends RichReduceFunction[(String, Int)] {
-  def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
-    value1
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
deleted file mode 100644
index 3c1e9c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.mutable
-
-class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testCoGroup(): Unit = {
-    CoGroupJoinITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
-        ctx.collect(("a", 0))
-        ctx.collect(("a", 1))
-        ctx.collect(("a", 2))
-        ctx.collect(("b", 3))
-        ctx.collect(("b", 4))
-        ctx.collect(("b", 5))
-        ctx.collect(("a", 6))
-        ctx.collect(("a", 7))
-        ctx.collect(("a", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
-
-    val source2 = env.addSource(new SourceFunction[(String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
-        ctx.collect(("a", 0))
-        ctx.collect(("a", 1))
-        ctx.collect(("b", 3))
-        ctx.collect(("c", 6))
-        ctx.collect(("c", 7))
-        ctx.collect(("c", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
-
-    source1.coGroup(source2)
-      .where(_._1)
-      .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
-          "F:" + first.mkString("") + " S:" + second.mkString("")
-      }
-      .addSink(new SinkFunction[String]() {
-        def invoke(value: String) {
-          CoGroupJoinITCase.testResults += value
-        }
-      })
-
-    env.execute("CoGroup Test")
-
-    val expectedResult = mutable.MutableList(
-      "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
-      "F:(b,3)(b,4)(b,5) S:(b,3)",
-      "F:(a,6)(a,7)(a,8) S:",
-      "F: S:(c,6)(c,7)(c,8)")
-
-    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
-  }
-
-  @Test
-  def testJoin(): Unit = {
-    CoGroupJoinITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
-        ctx.collect(("a", "x", 0))
-        ctx.collect(("a", "y", 1))
-        ctx.collect(("a", "z", 2))
-
-        ctx.collect(("b", "u", 3))
-        ctx.collect(("b", "w", 5))
-
-        ctx.collect(("a", "i", 6))
-        ctx.collect(("a", "j", 7))
-        ctx.collect(("a", "k", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
-    val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
-        ctx.collect(("a", "u", 0))
-        ctx.collect(("a", "w", 1))
-
-        ctx.collect(("b", "i", 3))
-        ctx.collect(("b", "k", 5))
-
-        ctx.collect(("a", "x", 6))
-        ctx.collect(("a", "z", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
-    source1.join(source2)
-      .where(_._1)
-      .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .apply( (l, r) => l.toString + ":" + r.toString)
-      .addSink(new SinkFunction[String]() {
-        def invoke(value: String) {
-          CoGroupJoinITCase.testResults += value
-        }
-      })
-
-    env.execute("Join Test")
-
-    val expectedResult = mutable.MutableList(
-      "(a,x,0):(a,u,0)",
-      "(a,x,0):(a,w,1)",
-      "(a,y,1):(a,u,0)",
-      "(a,y,1):(a,w,1)",
-      "(a,z,2):(a,u,0)",
-      "(a,z,2):(a,w,1)",
-      "(b,u,3):(b,i,3)",
-      "(b,u,3):(b,k,5)",
-      "(b,w,5):(b,i,3)",
-      "(b,w,5):(b,k,5)",
-      "(a,i,6):(a,x,6)",
-      "(a,i,6):(a,z,8)",
-      "(a,j,7):(a,x,6)",
-      "(a,j,7):(a,z,8)",
-      "(a,k,8):(a,x,6)",
-      "(a,k,8):(a,z,8)")
-
-    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelfJoin(): Unit = {
-    CoGroupJoinITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
-        ctx.collect(("a", "x", 0))
-        ctx.collect(("a", "y", 1))
-        ctx.collect(("a", "z", 2))
-
-        ctx.collect(("b", "u", 3))
-        ctx.collect(("b", "w", 5))
-
-        ctx.collect(("a", "i", 6))
-        ctx.collect(("a", "j", 7))
-        ctx.collect(("a", "k", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
-
-    source1.join(source1)
-      .where(_._1)
-      .equalTo(_._1)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .apply( (l, r) => l.toString + ":" + r.toString)
-      .addSink(new SinkFunction[String]() {
-      def invoke(value: String) {
-        CoGroupJoinITCase.testResults += value
-      }
-    })
-
-    env.execute("Self-Join Test")
-
-    val expectedResult = mutable.MutableList(
-      "(a,x,0):(a,x,0)",
-      "(a,x,0):(a,y,1)",
-      "(a,x,0):(a,z,2)",
-      "(a,y,1):(a,x,0)",
-      "(a,y,1):(a,y,1)",
-      "(a,y,1):(a,z,2)",
-      "(a,z,2):(a,x,0)",
-      "(a,z,2):(a,y,1)",
-      "(a,z,2):(a,z,2)",
-      "(b,u,3):(b,u,3)",
-      "(b,u,3):(b,w,5)",
-      "(b,w,5):(b,u,3)",
-      "(b,w,5):(b,w,5)",
-      "(a,i,6):(a,i,6)",
-      "(a,i,6):(a,j,7)",
-      "(a,i,6):(a,k,8)",
-      "(a,j,7):(a,i,6)",
-      "(a,j,7):(a,j,7)",
-      "(a,j,7):(a,k,8)",
-      "(a,k,8):(a,i,6)",
-      "(a,k,8):(a,j,7)",
-      "(a,k,8):(a,k,8)")
-
-    assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted)
-  }
-
-}
-
-
-object CoGroupJoinITCase {
-  private var testResults: mutable.MutableList[String] = null
-
-  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
-    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2
-    }
-
-    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
-    }
-  }
-
-  private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] {
-    def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = {
-      element._3
-    }
-
-    def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
-      element._3 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
-    }
-  }
-}