You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:32 UTC

[16/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
deleted file mode 100644
index 08ce890..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.join;
-
-import org.apache.flink.streaming.scala.examples.join.WindowJoin;
-import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class WindowJoinITCase extends StreamingProgramTestBase {
-
-	protected String gradesPath;
-	protected String salariesPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// since the two sides of the join might have different speed
-		// the exact output can not be checked just whether it is well-formed
-		// checks that the result lines look like e.g. Person(bob, 2, 2015)
-		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
deleted file mode 100644
index b3629ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.socket;
-
-import org.apache.flink.streaming.scala.examples.socket.SocketTextStreamWordCount;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-
-public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
deleted file mode 100644
index ef4e47f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.windowing;
-
-import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
-import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		setParallelism(1); //needed to ensure total ordering for windows
-		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		TopSpeedWindowing.main(new String[]{textPath, resultPath});
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
deleted file mode 100644
index aa0d67e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ /dev/null
@@ -1,236 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-scala</artifactId>
-	<name>flink-streaming-scala</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-reflect</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-compiler</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>org.ow2.asm</groupId>
-			<artifactId>asm</artifactId>
-			<version>${asm.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<!-- To access general test utils -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<!-- To access test data -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- To access streaming test utils -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
- 
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-					<compilerPlugins combine.children="append">
-					   <compilerPlugin>
-						   <groupId>org.scalamacros</groupId>
-						   <artifactId>paradise_${scala.version}</artifactId>
-						   <version>${scala.macros.version}</version>
-					   </compilerPlugin>
-				   </compilerPlugins>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<version>0.5.0</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<verbose>false</verbose>
-					<failOnViolation>true</failOnViolation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<failOnWarning>false</failOnWarning>
-					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
-					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-					<outputEncoding>UTF-8</outputEncoding>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
deleted file mode 100644
index 0357144..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.reflect.ClassTag
-
-import scala.collection.JavaConverters._
-
-/**
- * A [[AllWindowedStream]] represents a data stream where the stream of
- * elements is split into windows based on a
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission
- * is triggered based on a [[Trigger]].
- *
- * If an [[Evictor]] is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the [[Trigger]] but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * Note that the [[AllWindowedStream()]] is purely and API construct, during runtime
- * the [[AllWindowedStream()]] will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @tparam T The type of elements in the stream.
- * @tparam W The type of [[Window]] that the
- *           [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]
- *           assigns the elements to.
- */
-class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
-
-  /**
-   * Sets the [[Trigger]] that should be used to trigger window emission.
-   */
-  def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
-    javaStream.trigger(trigger)
-    this
-  }
-
-  /**
-   * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
-   *
-   * Note: When using an evictor window performance will degrade significantly, since
-   * pre-aggregation of window results cannot be used.
-   */
-  def evictor(evictor: Evictor[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
-    javaStream.evictor(evictor)
-    this
-  }
-
-  // ------------------------------------------------------------------------
-  //  Operations on the keyed windows
-  // ------------------------------------------------------------------------
-
-  /**
-   * Applies a reduce function to the window. The window function is called for each evaluation
-   * of the window for each key individually. The output of the reduce function is interpreted
-   * as a regular non-windowed stream.
-   *
-   * This window will try and pre-aggregate data as much as the window policies permit. For example,
-   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
-   * interval, so a few elements are stored per key (one per slide interval).
-   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-   * aggregation tree.
-   *
-   * @param function The reduce function.
-   * @return The data stream that is the result of applying the reduce function to the window.
-   */
-  def reduce(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduce(clean(function))
-  }
-
-  /**
-   * Applies a reduce function to the window. The window function is called for each evaluation
-   * of the window for each key individually. The output of the reduce function is interpreted
-   * as a regular non-windowed stream.
-   *
-   * This window will try and pre-aggregate data as much as the window policies permit. For example,
-   * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-   * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
-   * interval, so a few elements are stored per key (one per slide interval).
-   * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-   * aggregation tree.
-   *
-   * @param function The reduce function.
-   * @return The data stream that is the result of applying the reduce function to the window.
-   */
-  def reduce(function: (T, T) => T): DataStream[T] = {
-    if (function == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val cleanFun = clean(function)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies the given fold function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the reduce function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * @param function The fold function.
-   * @return The data stream that is the result of applying the fold function to the window.
-   */
-  def fold[R: TypeInformation: ClassTag](
-      initialValue: R,
-      function: FoldFunction[T,R]): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-
-    val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
-    javaStream.fold(initialValue, function, resultType)
-  }
-
-  /**
-   * Applies the given fold function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the reduce function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * @param function The fold function.
-   * @return The data stream that is the result of applying the fold function to the window.
-   */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Fold function must not be null.")
-    }
-    val cleanFun = clean(function)
-    val folder = new FoldFunction[T,R] {
-      def fold(acc: R, v: T) = {
-        cleanFun(acc, v)
-      }
-    }
-    fold(initialValue, folder)
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Not that this function requires that all data in the windows is buffered until the window
-   * is evaluated, as the function provides no means of pre-aggregation.
-   *
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, W]): DataStream[R] = {
-    javaStream.apply(clean(function), implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Not that this function requires that all data in the windows is buffered until the window
-   * is evaluated, as the function provides no means of pre-aggregation.
-   *
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
-    val cleanedFunction = clean(function)
-    val applyFunction = new AllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanedFunction(window, elements.asScala, out)
-      }
-    }
-    javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-   *
-   * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      preAggregator: ReduceFunction[T],
-      function: AllWindowFunction[T, R, W]): DataStream[R] = {
-    javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies the given window function to each window. The window function is called for each
-   * evaluation of the window for each key individually. The output of the window function is
-   * interpreted as a regular non-windowed stream.
-   *
-   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-   *
-   * @param preAggregator The reduce function that is used for pre-aggregation
-   * @param function The window function.
-   * @return The data stream that is the result of applying the window function to the window.
-   */
-  def apply[R: TypeInformation: ClassTag](
-      preAggregator: (T, T) => T,
-      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
-    if (function == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    if (function == null) {
-      throw new NullPointerException("WindowApply function must not be null.")
-    }
-
-    val cleanReducer = clean(preAggregator)
-    val reducer = new ReduceFunction[T] {
-      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
-    }
-
-    val cleanApply = clean(function)
-    val applyFunction = new AllWindowFunction[T, R, W] {
-      def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
-        cleanApply(window, elements.asScala, out)
-      }
-    }
-    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
-  }
-
-  // ------------------------------------------------------------------------
-  //  Aggregations on the keyed windows
-  // ------------------------------------------------------------------------
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given position.
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given field.
-   */
-  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given position.
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given field.
-   */
-  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given position.
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given field.
-   */
-  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given position. When equality, returns the first.
-   */
-  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
-    position)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given field. When equality, returns the first.
-   */
-  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
-    field)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given position. When equality, returns the first.
-   */
-  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
-    position)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given field. When equality, returns the first.
-   */
-  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
-    field)
-
-  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
-    val position = fieldNames2Indices(getInputType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }
-
-  def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaAllWStream[Product, W]]
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM =>
-        new SumAggregator(position, jStream.getInputType, jStream.getExecutionEnvironment.getConfig)
-
-      case _ =>
-        new ComparableAggregator(
-          position,
-          jStream.getInputType,
-          aggregationType,
-          true,
-          jStream.getExecutionEnvironment.getConfig)
-    }
-
-    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
-  }
-
-  // ------------------------------------------------------------------------
-  //  Utilities
-  // ------------------------------------------------------------------------
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
-  }
-
-  /**
-   * Gets the output type.
-   */
-  private def getInputType(): TypeInformation[T] = javaStream.getInputType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
deleted file mode 100644
index e676f81..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
-import org.apache.flink.streaming.api.windowing.evictors.Evictor
-import org.apache.flink.streaming.api.windowing.triggers.Trigger
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-import scala.collection.JavaConverters._
-
-import scala.reflect.ClassTag
-
-/**
- * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
- * A streaming co-group operation is evaluated over elements in a window.
- *
- * To finalize the co-group operation you also need to specify a [[KeySelector]] for
- * both the first and second input and a [[WindowAssigner]]
- *
- * Note: Right now, the groups are being built in memory so you need to ensure that they don't
- * get too big. Otherwise the JVM might crash.
- *
- * Example:
- *
- * {{{
- * val one: DataStream[(String, Int)]  = ...
- * val two: DataStream[(String, Int)] = ...
- *
- * val result = one.coGroup(two)
- *     .where(new MyFirstKeySelector())
- *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- *     .apply(new MyCoGroupFunction())
- * } }}}
- */
-object CoGroupedStreams {
-
-  /**
-   * A co-group operation that does not yet have its [[KeySelector]]s defined.
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   */
-  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
-    }
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the second input.
-     */
-    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val keyType = implicitly[TypeInformation[KEY]]
-      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T2) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = keyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
-    }
-  }
-
-  /**
-   * A co-group operation that has [[KeySelector]]s defined for either both or
-   * one input.
-   *
-   * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]]
-   * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]].
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   * @tparam KEY Type of the key. This must be the same for both inputs
-   */
-  class WithKey[T1, T2, KEY](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      keyType: TypeInformation[KEY]) {
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the first input.
-     */
-    def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val localKeyType = keyType
-      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T1) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = localKeyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType)
-    }
-
-    /**
-     * Specifies a [[KeySelector]] for elements from the second input.
-     */
-    def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
-      val cleanFun = clean(keySelector)
-      val localKeyType = keyType
-      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
-        def getKey(in: T2) = cleanFun(in)
-        override def getProducedType: TypeInformation[KEY] = localKeyType
-      }
-      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType)
-    }
-
-    /**
-     * Specifies the window on which the co-group operation works.
-     */
-    def window[W <: Window](
-        assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W])
-        : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      if (keySelector1 == null || keySelector2 == null) {
-        throw new UnsupportedOperationException("You first need to specify KeySelectors for both" +
-          "inputs using where() and equalTo().")
-      }
-      new CoGroupedStreams.WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        clean(assigner),
-        null,
-        null)
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
-    }
-  }
-
-  /**
-   * A co-group operation that has [[KeySelector]]s defined for both inputs as
-   * well as a [[WindowAssigner]].
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
-   * @tparam KEY Type of the key. This must be the same for both inputs
-   * @tparam W Type of { @link Window} on which the co-group operation works.
-   */
-  class WithWindow[T1, T2, KEY, W <: Window](
-      input1: DataStream[T1],
-      input2: DataStream[T2],
-      keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY],
-      windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
-      trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
-      evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
-
-
-    /**
-     * Sets the [[Trigger]] that should be used to trigger window emission.
-     */
-    def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
-    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        newTrigger,
-        evictor)
-    }
-
-    /**
-     * Sets the [[Evictor]] that should be used to evict elements from a window before emission.
-     *
-     * Note: When using an evictor window performance will degrade significantly, since
-     * pre-aggregation of window results cannot be used.
-     */
-    def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
-    : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
-      new WithWindow[T1, T2, KEY, W](
-        input1,
-        input2,
-        keySelector1,
-        keySelector2,
-        windowAssigner,
-        trigger,
-        newEvictor)
-    }
-
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation: ClassTag](
-        fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
-      require(fun != null, "CoGroup function must not be null.")
-
-      val coGrouper = new CoGroupFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def coGroup(
-            left: java.lang.Iterable[T1],
-            right: java.lang.Iterable[T2], out: Collector[O]) = {
-          out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala))
-        }
-      }
-      apply(coGrouper)
-    }
-
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[O: TypeInformation: ClassTag](
-        fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
-      require(fun != null, "CoGroup function must not be null.")
-
-      val coGrouper = new CoGroupFunction[T1, T2, O] {
-        val cleanFun = clean(fun)
-        def coGroup(
-            left: java.lang.Iterable[T1],
-            right: java.lang.Iterable[T2], out: Collector[O]) = {
-          cleanFun(left.iterator.asScala, right.iterator.asScala, out)
-        }
-      }
-      apply(coGrouper)
-    }
-
-    /**
-     * Completes the co-group operation with the user function that is executed
-     * for windowed groups.
-     */
-    def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
-
-      val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
-
-      coGroup
-        .where(keySelector1)
-        .equalTo(keySelector2)
-        .window(windowAssigner)
-        .trigger(trigger)
-        .evictor(evictor)
-        .apply(clean(function), implicitly[TypeInformation[T]])
-    }
-
-    /**
-     * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-     * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
-     */
-    private[flink] def clean[F <: AnyRef](f: F): F = {
-      new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
-    }
-  }
-
-
-  /**
-   * Creates a new co-group operation from the two given inputs.
-   */
-  def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2])
-      : CoGroupedStreams.Unspecified[T1, T2] = {
-    new CoGroupedStreams.Unspecified[T1, T2](input1, input2)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
deleted file mode 100644
index 3ff773f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
-import org.apache.flink.util.Collector
-import scala.reflect.ClassTag
-
-/**
- * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It
- * can be used to apply transformations such as [[CoMapFunction]] on two
- * [[DataStream]]s.
- */
-class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
-   * the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input and
-   * @param fun2 for each element of the second input. Each
-   * CoMapFunction call returns exactly one element.
-   *
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
-  DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val comapper = new CoMapFunction[IN1, IN2, R] {
-      def map1(in1: IN1): R = cleanFun1(in1)
-      def map2(in2: IN2): R = cleanFun2(in2)
-    }
-
-    map(comapper)
-  }
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
-   * the output to a common type. The transformation calls a
-   * {@link CoMapFunction#map1} for each element of the first input and
-   * {@link CoMapFunction#map2} for each element of the second input. Each
-   * CoMapFunction call returns exactly one element. The user can also extend
-   * {@link RichCoMapFunction} to gain access to other features provided by
-   * the {@link RichFuntion} interface.
-   *
-   * @param coMapper
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coMapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
-   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none. The user can also extend {@link RichFlatMapFunction} to
-   * gain access to other features provided by the {@link RichFuntion}
-   * interface.
-   *
-   * @param coFlatMapper
-   * The CoFlatMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coFlatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
-      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
-      def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
-      fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
-      def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return @return The transformed { @link ConnectedStreams}
-   */
-  def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
-   *
-   * @param keyPositions1
-   * The fields used to group the first input stream.
-   * @param keyPositions2
-   * The fields used to group the second input stream.
-   * @return @return The transformed { @link ConnectedStreams}
-   */
-  def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The grouping expression for the first input
-   * @param field2
-   * The grouping expression for the second input
-   * @return The grouped { @link ConnectedStreams}
-   */
-  def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(field1, field2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to fields1 and fields2. A
-   * field expression is either the name of a public field or a getter method
-   * with parentheses of the {@link DataStream}S underlying type. A dot can be
-   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-   * .
-   *
-   * @param fields1
-   * The grouping expressions for the first input
-   * @param fields2
-   * The grouping expressions for the second input
-   * @return The grouped { @link ConnectedStreams}
-   */
-  def keyBy(fields1: Array[String], fields2: Array[String]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.keyBy(fields1, fields2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 using fun1 and fun2. Used for applying
-   * function on grouped data streams for example
-   * {@link ConnectedStreams#reduce}
-   *
-   * @param fun1
-   * The function used for grouping the first input
-   * @param fun2
-   * The function used for grouping the second input
-   * @return The grouped { @link ConnectedStreams}
-   */
-  def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2):
-  ConnectedStreams[IN1, IN2] = {
-
-    val keyType1 = implicitly[TypeInformation[K1]]
-    val keyType2 = implicitly[TypeInformation[K2]]
-    
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    
-    val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
-    val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
-    
-    javaStream.keyBy(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2.
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return The transformed { @link ConnectedStreams}
-   */
-  def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2.
-   *
-   * @param keyPositions1
-   * The fields used to partition the first input stream.
-   * @param keyPositions2
-   * The fields used to partition the second input stream.
-   * @return The transformed { @link ConnectedStreams}
-   */
-  def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The partitioning expression for the first input
-   * @param field2
-   * The partitioning expression for the second input
-   * @return The grouped { @link ConnectedStreams}
-   */
-  def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(field1, field2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to fields1 and fields2.
-   *
-   * @param fields1
-   * The partitioning expressions for the first input
-   * @param fields2
-   * The partitioning expressions for the second input
-   * @return The partitioned { @link ConnectedStreams}
-   */
-  def partitionByHash(fields1: Array[String], fields2: Array[String]):
-  ConnectedStreams[IN1, IN2] = {
-    javaStream.partitionByHash(fields1, fields2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 using fun1 and fun2.
-   *
-   * @param fun1
-   * The function used for partitioning the first input
-   * @param fun2
-   * The function used for partitioning the second input
-   * @return The partitioned { @link ConnectedStreams}
-   */
-  def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
-  ConnectedStreams[IN1, IN2] = {
-
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-
-    val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = cleanFun1(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = cleanFun2(in)
-    }
-
-    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
-  }
-
-}
-
-class KeySelectorWithType[IN, K](
-        private[this] val fun: IN => K,
-        private[this] val info: TypeInformation[K])
-  extends KeySelector[IN, K] with ResultTypeQueryable[K] {
-  
-  override def getKey(value: IN): K = fun(value)
-
-  override def getProducedType: TypeInformation[K] = info
-}