You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 10:24:36 UTC

[GitHub] aljoscha closed pull request #6894: [FLINK-7811] Add support for Scala 2.12

aljoscha closed pull request #6894: [FLINK-7811] Add support for Scala 2.12
URL: https://github.com/apache/flink/pull/6894
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index d4b8abafff0..634a2f61cc4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -76,25 +76,25 @@ jobs:
     # main profile
     - stage: compile
       script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: compile
     - stage: test
       script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: core
     - script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: libraries
     - script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: connectors
     - script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: tests
     - script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: misc
     - stage: cleanup
       script: ./tools/travis_controller.sh
-      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws"
+      env: PROFILE="-Dhadoop.version=2.8.3 -Pinclude-kinesis -Dinclude_hadoop_aws -Pscala-2.11"
       name: cleanup
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index df1afb57aa5..52f0ef17b9a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -62,6 +62,14 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
+			<exclusions>
+				<!-- we need to exclude because the base module has this but it doesn't exist
+				for Kafka 0.8-->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka-clients</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 3e0aa46e095..b6056d8332e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -135,14 +135,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<!-- include 0.9 server for tests  -->
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_${scala.binary.version}</artifactId>
-			<version>${kafka.version}</version>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-jmx</artifactId>
@@ -191,6 +183,21 @@ under the License.
 	</dependencies>
 
 	<profiles>
+		<!-- The kafka_${...} dependency doesn't exist for Scala 2.12, so we need to exclude
+		this (test-)dependency when building for Scala 2.12. -->
+		<profile>
+			<id>scala-2.11</id>
+			<dependencies>
+				<dependency>
+					<!-- include 0.9 server for tests  -->
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+					<version>${kafka.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+
 		<!-- Create SQL Client uber jars by default -->
 		<profile>
 			<id>sql-jars</id>
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index d6a6bb6d5ea..7a2ab1d4879 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.8.2.2</kafka.version>
+		<kafka.version>0.10.2.1</kafka.version>
 	</properties>
 
 	<dependencies>
@@ -82,10 +82,19 @@ under the License.
 			<optional>true</optional>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_${scala.binary.version}</artifactId>
 			<version>${kafka.version}</version>
+			<scope>test</scope>
 			<exclusions>
 				<exclusion>
 					<groupId>com.sun.jmx</groupId>
@@ -126,8 +135,6 @@ under the License.
 			</exclusions>
 		</dependency>
 
-		<!-- test dependencies -->
-
 		<!-- force using the latest zkclient -->
 		<dependency>
 			<groupId>com.101tec</groupId>
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index d7a851a8063..12ca5430415 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -42,7 +42,6 @@ under the License.
 		<module>flink-hbase</module>
 		<module>flink-hcatalog</module>
 		<module>flink-connector-kafka-base</module>
-		<module>flink-connector-kafka-0.8</module>
 		<module>flink-connector-kafka-0.9</module>
 		<module>flink-connector-kafka-0.10</module>
 		<module>flink-connector-kafka-0.11</module>
@@ -83,6 +82,14 @@ under the License.
 
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
+		<!-- there's no Kafka 0.8 dependency for Scala 2.12, we only include when building
+		for Scala 2.11 -->
+		<profile>
+			<id>scala-2.11</id>
+			<modules>
+				<module>flink-connector-kafka-0.8</module>
+			</modules>
+		</profile>
 		<!--
 			We include the kinesis module only optionally because it contains a dependency
 			licenced under the "Amazon Software License".
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 5ac914e03ef..af36655f2e1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -401,7 +401,8 @@ private Kryo getKryoInstance() {
 		try {
 			// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
 			// This will be true if Flink's Scala API is used.
-			Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
+			Class<?> chillInstantiatorClazz =
+					Class.forName("org.apache.flink.runtime.types.FlinkScalaKryoInstantiator");
 			Object chillInstantiator = chillInstantiatorClazz.newInstance();
 
 			// obtain a Kryo instance through Twitter Chill
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 29885be93a2..cb8c9cfc713 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -169,6 +169,7 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
 		scalaTypes.add("scala.Tuple2$mcDJ$sp");
 		scalaTypes.add("scala.Tuple2$mcDI$sp");
 		scalaTypes.add("scala.Tuple2$mcDD$sp");
+		scalaTypes.add("scala.Enumeration$ValueSet");
 	}
 
 	/**
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 1f3a32b0a16..8f87b91bbed 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -67,11 +67,14 @@ object TransitiveClosureNaive {
       val terminate = prevPaths
         .coGroup(nextPaths)
         .where(0).equalTo(0) {
-          (prev, next, out: Collector[(Long, Long)]) => {
-            val prevPaths = prev.toSet
-            for (n <- next)
-              if (!prevPaths.contains(n)) out.collect(n)
-          }
+          (
+            prev: Iterator[(Long, Long)],
+            next: Iterator[(Long, Long)],
+            out: Collector[(Long, Long)]) => {
+              val prevPaths = prev.toSet
+              for (n <- next)
+                if (!prevPaths.contains(n)) out.collect(n)
+            }
       }.withForwardedFieldsSecond("*")
       (nextPaths, terminate)
     }
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 7ed39c94281..f068ed34727 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -118,8 +118,11 @@ object WebLogAnalysis {
     }.withForwardedFieldsSecond("*")
 
     val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
-      (ranks, visits, out: Collector[(Int, String, Int)]) =>
-        if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
+      (
+        ranks: Iterator[(Int, String, Int)],
+        visits: Iterator[(String, String)],
+        out: Collector[(Int, String, Int)]) =>
+          if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
     }.withForwardedFieldsFirst("*")
 
     // emit result
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
index ecbf7c5e98b..1fa3ace4fad 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
  */
 object IterateExample {
 
-  private val Bound = 100
+  private final val Bound = 100
 
   def main(args: Array[String]): Unit = {
     // Checking input parameters
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 9d844c951c6..399524b4ea5 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -64,8 +64,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-table_2.11</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-table. -->
@@ -76,25 +75,22 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.11</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
 
-		<!-- To avoid having to have the 'flink-avro' project dependent on a particular
-			Scala version, we hard-refer the flink-streaming-scala_2.11 here -->
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_2.11</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.11</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -106,6 +102,15 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
+		<!-- We need this for the patched FlinkScalaKryoInstantiator -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<profiles>
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 0cb3f8ddd2b..062dcb6d781 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -53,8 +53,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-table_2.11</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-table. -->
@@ -70,17 +69,23 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-table_2.11</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
 
-		<!-- flink-table needs Scala -->
+		<!-- flink-table needs these -->
 		<dependency>
 			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-compiler</artifactId>
+			<artifactId>scala-library</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<!-- the reason things normally work out without this is that this is a transitive
+		depedency of flink-runtime via Akka -->
+		<dependency>
+			<groupId>org.scala-lang.modules</groupId>
+			<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 	</dependencies>
diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml
index e338d054ec0..b1958013513 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -96,14 +96,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.11</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.11</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
@@ -117,7 +117,6 @@ under the License.
 
 	</dependencies>
 
-
 	<build>
 		<plugins>
 			<!-- Generate Test class from avro schema -->
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index d5745131c1b..33d24de7748 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -24,6 +24,7 @@ import org.apache.flink.cep.Event
 import org.apache.flink.cep.SubEvent
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy
 import org.apache.flink.cep.pattern.conditions._
+import org.apache.flink.cep.scala.conditions.Context
 
 class PatternTest {
 
@@ -81,9 +82,9 @@ class PatternTest {
   def testStrictContiguityWithCondition(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("next")
-      .where((value: Event, _) => value.getName == "foobar")
+      .where((value: Event, _: Context[Event]) => value.getName == "foobar")
       .next("end")
-      .where((value: Event, _) => value.getId == 42)
+      .where((value: Event, _: Context[Event]) => value.getId == 42)
 
     val jPattern = JPattern.begin[Event]("start")
       .next("next")
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 27bc548d742..b006bceb4c1 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -109,8 +109,8 @@ object Graph {
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    val javaTupleVertices = vertices.map((v: (K, VV)) => new jtuple.Tuple2(v._1, v._2)).javaSet
+    val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
       env.getJavaEnv))
   }
@@ -124,7 +124,7 @@ object Graph {
    */
   def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
   (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
   }
 
@@ -139,7 +139,7 @@ object Graph {
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
   vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, vertexValueInitializer,
       env.getJavaEnv))
   }
@@ -152,7 +152,7 @@ object Graph {
    */
   def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
   env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet
     wrapGraph(jg.Graph.fromTuple2DataSet[K](javaTupleEdges, env.getJavaEnv))
   }
 
@@ -166,7 +166,7 @@ object Graph {
   def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag]
   (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
   env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
-    val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet
     wrapGraph(jg.Graph.fromTuple2DataSet[K, VV](javaTupleEdges, vertexValueInitializer,
       env.getJavaEnv))
   }
@@ -259,7 +259,7 @@ object Graph {
         ignoreCommentsEdges,
         lenientEdges,
         includedFieldsEdges)
-        .map(edge => (edge._1, edge._2, NullValue.getInstance))
+        .map((edge: (K, K)) => (edge._1, edge._2, NullValue.getInstance))
         .asInstanceOf[DataSet[(K, K, EV)]]
     } else {
       env.readCsvFile[(K, K, EV)](
@@ -331,14 +331,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return the vertex DataSet as Tuple2.
    */
   def getVerticesAsTuple2(): DataSet[(K, VV)] = {
-    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
+    wrap(jgraph.getVerticesAsTuple2).map(
+      (v: jtuple.Tuple2[K, VV]) => (v.f0, v.f1))
   }
 
   /**
    * @return the edge DataSet as Tuple3.
    */
   def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
-    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+    wrap(jgraph.getEdgesAsTuple3).map(
+      (e: jtuple.Tuple3[K, K, EV]) =>
+        (e.f0, e.f1, e.f2))
   }
 
   /**
@@ -508,8 +511,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, vertexJoinFunction))
   }
 
@@ -537,8 +540,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
         cleanFun(vertexValue, inputValue)
       }
     }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newVertexJoin))
   }
 
@@ -559,8 +562,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
-      scalatuple._2, scalatuple._3)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, K, T)) => new jtuple.Tuple3(i._1, i._2, i._3)).javaSet
     wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, edgeJoinFunction))
   }
 
@@ -588,8 +591,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
         cleanFun(edgeValue, inputValue)
       }
     }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
-      scalatuple._2, scalatuple._3)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, K, T)) => new jtuple.Tuple3(i._1, i._2, i._3)).javaSet
     wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newEdgeJoin))
   }
 
@@ -611,8 +614,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, edgeJoinFunction))
   }
 
@@ -641,8 +644,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
         cleanFun(edgeValue, inputValue)
       }
     }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newEdgeJoin))
   }
 
@@ -664,8 +667,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
   edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, edgeJoinFunction))
   }
 
@@ -694,8 +697,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
         cleanFun(edgeValue, inputValue)
       }
     }
-    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
-      scalatuple._2)).javaSet
+    val javaTupleSet = inputDataSet.map(
+      (i: (K, T)) => new jtuple.Tuple2(i._1, i._2)).javaSet
     wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newEdgeJoin))
   }
 
@@ -799,7 +802,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return A DataSet of Tuple2<vertexId, inDegree>
    */
   def inDegrees(): DataSet[(K, LongValue)] = {
-    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+    wrap(jgraph.inDegrees).map((i: jtuple.Tuple2[K, LongValue]) => (i.f0, i.f1))
   }
 
   /**
@@ -808,7 +811,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return A DataSet of Tuple2<vertexId, outDegree>
    */
   def outDegrees(): DataSet[(K, LongValue)] = {
-    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+    wrap(jgraph.outDegrees).map((i: jtuple.Tuple2[K, LongValue]) => (i.f0, i.f1))
   }
 
   /**
@@ -817,7 +820,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return A DataSet of Tuple2<vertexId, degree>
    */
   def getDegrees(): DataSet[(K, LongValue)] = {
-    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+    wrap(jgraph.getDegrees).map((i: jtuple.Tuple2[K, LongValue]) => (i.f0, i.f1))
   }
 
   /**
@@ -927,7 +930,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return The IDs of the edges as DataSet
    */
   def getEdgeIds(): DataSet[(K, K)] = {
-    wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1))
+    wrap(jgraph.getEdgeIds).map((i: jtuple.Tuple2[K, K]) => (i.f0, i.f1))
   }
 
   /**
@@ -1083,8 +1086,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
   EdgeDirection): DataSet[(K, VV)] = {
-    wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple
-      .f0, jtuple.f1))
+    wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(
+      (i: jtuple.Tuple2[K, VV]) => (i.f0, i.f1))
   }
 
   /**
@@ -1102,8 +1105,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    */
   def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
   DataSet[(K, EV)] = {
-    wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
-      jtuple.f1))
+    wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(
+      (i: jtuple.Tuple2[K, EV]) => (i.f0, i.f1))
   }
 
   /**
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index a25b4b9dbc9..14b10791fa6 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -47,7 +47,7 @@
 		<dependency>
 			<groupId>org.scalanlp</groupId>
 			<artifactId>breeze_${scala.binary.version}</artifactId>
-			<version>0.12</version>
+			<version>0.13</version>
 		</dependency>
 
 		<!-- the dependencies below are already provided in Flink -->
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
index bfc72a4fcb1..4bb21d5d997 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
@@ -376,7 +376,7 @@ object FlinkMLTools {
     partitionerOption: Option[Partitioner[Int]] = None)
   : DataSet[Block[T]] = {
     val blockIDInput = input map {
-      element =>
+      element: T =>
         val blockID = element.hashCode() % numBlocks
 
         val blockIDResult = if(blockID < 0){
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
index 527e6365ff6..68787035fa2 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
@@ -227,7 +227,8 @@ object KNN {
 
             // join input and training set
             val crossed = crossTuned.mapPartition {
-              (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => {
+              (iter: Iterator[(Block[FlinkVector], Block[(Long, T)])],
+               out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => {
                 for ((training, testing) <- iter) {
                   // use a quadtree if (4 ^ dim) * Ntest * log(Ntrain)
                   // < Ntest * Ntrain, and distance is Euclidean
@@ -247,12 +248,13 @@ object KNN {
                     knnQueryBasic(training.values, testing.values, k, metric, out)
                   }
                 }
-              }
+                }
             }
 
             // group by input vector id and pick k nearest neighbor for each group
             val result = crossed.groupBy(2).sortGroup(3, Order.ASCENDING).reduceGroup {
-              (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => {
+              (iter: Iterator[(FlinkVector, FlinkVector, Long, Double)],
+               out: Collector[(FlinkVector, Array[FlinkVector])]) => {
                 if (iter.hasNext) {
                   val head = iter.next()
                   val key = head._2
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
index ee82c03f484..2ff46dbcfdf 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
@@ -197,7 +197,7 @@ object StochasticOutlierSelection extends WithParameters {
         val resultingParameters = instance.parameters ++ transformParameters
 
         // Map to the right format
-        val vectorsWithIndex = input.zipWithUniqueId.map(vector => {
+        val vectorsWithIndex = input.zipWithUniqueId.map((vector: (Long, T)) => {
           BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
         })
 
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
index 217e2c26fed..b37748fb09c 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -149,7 +149,7 @@ object MinMaxScaler {
   : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
 
     val minMax = dataSet.map {
-      v => (v.asBreeze, v.asBreeze)
+      v: T => (v.asBreeze, v.asBreeze)
     }.reduce {
       (minMax1, minMax2) => {
 
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
index f1c788e4faf..977428d9531 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
@@ -116,7 +116,7 @@ object PolynomialFeatures{
         val degree = resultingParameters(Degree)
 
         input.map {
-          vector => {
+          vector: T => {
             calculatePolynomial(degree, vector)
           }
         }
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
index 3451c809235..c8bf0e74c19 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
@@ -76,7 +76,7 @@ object Splitter {
       }
     }
 
-    val leftSplitLight = leftSplit.map(o => (o._1, false))
+    val leftSplitLight = leftSplit.map((o: (Long, T)) => (o._1, false))
 
     val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, Boolean)](leftSplitLight)
       .where(0)
@@ -87,7 +87,7 @@ object Splitter {
         }
     }
 
-    Array(leftSplit.map(o => o._2), rightSplit)
+    Array(leftSplit.map((o: (Long, T)) => o._2), rightSplit)
   }
 
   // --------------------------------------------------------------------------------------------
@@ -117,14 +117,14 @@ object Splitter {
 
     eid.reseedRandomGenerator(seed)
 
-    val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o))
+    val tempDS: DataSet[(Int,T)] = input.map((o: T) => (eid.sample, o))
 
     val splits = fracArray.length
     val outputArray = new Array[DataSet[T]]( splits )
 
     for (k <- 0 to splits-1){
-      outputArray(k) = tempDS.filter(o => o._1 == k)
-                             .map(o => o._2)
+      outputArray(k) = tempDS.filter((o: (Int, T)) => o._1 == k)
+                             .map((o: (Int, T)) => o._2)
     }
 
     outputArray
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 82e8abf4df1..aa38f411059 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -158,7 +158,7 @@ object StandardScaler {
           fitParameters: ParameterMap,
           input: DataSet[(T, Double)])
       : Unit = {
-        val vectorDS = input.map(_._1)
+        val vectorDS = input.map( (i: (T, Double)) => i._1)
         val metrics = extractFeatureMetrics(vectorDS)
 
         instance.metricsOption = Some(metrics)
@@ -180,7 +180,7 @@ object StandardScaler {
   private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T])
   : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
     val metrics = dataSet.map{
-      v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
+      v: T => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
     }.reduce{
       (metrics1, metrics2) => {
         /* We use formula 1.5b of the cited technical report for the combination of partial
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index 68ec71857bb..dc1e86d14a1 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -45,22 +45,19 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-clients_2.11</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-streaming-scala_2.11</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-table_2.11</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -107,8 +104,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-runtime_2.11</artifactId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -116,16 +112,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-test-utils_2.11</artifactId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<!-- use a dedicated Scala version to not depend on it -->
-			<artifactId>flink-clients_2.11</artifactId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
@@ -135,6 +129,7 @@ under the License.
 
 	<build>
 		<plugins>
+
 			<!-- Build flink-sql-client jar -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index e7b79a5a196..3b08b6442d6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -142,7 +142,7 @@ class SortITCase extends StreamingWithStateTestBase {
 object SortITCase {
 
   final class StringRowSelectorSink(private val field:Int) extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
+    override def invoke(value: Row) {
       testResults.synchronized {
         testResults += value.getField(field).toString
       }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamITCase.scala
index 5386b40dd0a..a75c5d930ea 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamITCase.scala
@@ -44,7 +44,7 @@ object StreamITCase {
   }
 
   final class StringSink[T] extends RichSinkFunction[T]() {
-    def invoke(value: T) {
+    override def invoke(value: T) {
       testResults.synchronized {
         testResults += value.toString
       }
@@ -52,7 +52,7 @@ object StreamITCase {
   }
 
   final class RetractMessagesSink extends RichSinkFunction[(Boolean, Row)]() {
-    def invoke(v: (Boolean, Row)) {
+    override def invoke(v: (Boolean, Row)) {
       testResults.synchronized {
         testResults += (if (v._1) "+" else "-") + v._2
       }
@@ -60,7 +60,7 @@ object StreamITCase {
   }
 
   final class RetractingSink() extends RichSinkFunction[(Boolean, Row)] {
-    def invoke(v: (Boolean, Row)) {
+    override def invoke(v: (Boolean, Row)) {
       retractedResults.synchronized {
         val value = v._2.toString
         if (v._1) {
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
index a4ae2bbbea4..3b175d31c00 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
@@ -21,18 +21,19 @@ package org.apache.flink.mesos.scheduler
 import java.util.{Collections, UUID}
 import java.util.concurrent.atomic.AtomicReference
 
+import akka.actor.ActorSystem
 import akka.actor.FSM.StateTimeout
 import akka.testkit._
 import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest}
 import com.netflix.fenzo._
 import com.netflix.fenzo.functions.{Action1, Action2}
-import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.api.java.tuple.{Tuple2 => FlinkTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.mesos.scheduler.LaunchCoordinator._
 import org.apache.flink.mesos.scheduler.messages._
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.mesos.Protos.{SlaveID, TaskInfo}
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
 import org.junit.runner.RunWith
 import org.mockito.Mockito.{verify, _}
 import org.mockito.invocation.InvocationOnMock
@@ -56,8 +57,8 @@ class LaunchCoordinatorTest
     with Matchers
     with BeforeAndAfterAll {
 
-  lazy val config = new Configuration()
-  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+  lazy val config: Configuration = new Configuration()
+  implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
 
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
index c22385204ce..8e97613e259 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
@@ -20,14 +20,14 @@ package org.apache.flink.mesos.scheduler
 
 import java.util.UUID
 
-import akka.actor.FSM
+import akka.actor.{ActorSystem, FSM}
 import akka.testkit._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.mesos.Matchers._
 import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.mesos.Protos.TaskState._
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
 import org.junit.runner.RunWith
 import org.mockito.Mockito
 import org.mockito.Mockito._
@@ -44,8 +44,8 @@ class ReconciliationCoordinatorTest
 
   import ReconciliationCoordinator._
 
-  lazy val config = new Configuration()
-  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+  lazy val config: Configuration = new Configuration()
+  implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
 
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
index b4ef93837c1..564a3e3f4ec 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
@@ -20,14 +20,15 @@ package org.apache.flink.mesos.scheduler
 
 import java.util.UUID
 
+import akka.actor.ActorSystem
 import akka.actor.FSM.StateTimeout
 import akka.testkit._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.mesos.TestFSMUtils
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
-import org.apache.flink.mesos.scheduler.messages.{Disconnected, Connected, StatusUpdate}
+import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
 import org.apache.mesos.Protos.TaskState._
 import org.junit.runner.RunWith
 import org.mockito.Mockito
@@ -45,8 +46,8 @@ class TaskMonitorTest
 
   import TaskMonitor._
 
-  lazy val config = new Configuration()
-  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+  lazy val config: Configuration = new Configuration()
+  implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
 
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/FlinkChillPackageRegistrar.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/FlinkChillPackageRegistrar.java
new file mode 100644
index 00000000000..64f9431dd63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/types/FlinkChillPackageRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.types;
+
+import com.twitter.chill.IKryoRegistrar;
+import com.twitter.chill.java.ArraysAsListSerializer;
+import com.twitter.chill.java.BitSetSerializer;
+import com.twitter.chill.java.InetSocketAddressSerializer;
+import com.twitter.chill.java.IterableRegistrar;
+import com.twitter.chill.java.LocaleSerializer;
+import com.twitter.chill.java.RegexSerializer;
+import com.twitter.chill.java.SimpleDateFormatSerializer;
+import com.twitter.chill.java.SqlDateSerializer;
+import com.twitter.chill.java.SqlTimeSerializer;
+import com.twitter.chill.java.TimestampSerializer;
+import com.twitter.chill.java.URISerializer;
+import com.twitter.chill.java.UUIDSerializer;
+
+/**
+ * Creates a registrar for all the serializers in the chill.java package.
+ */
+public class FlinkChillPackageRegistrar {
+
+	public static IKryoRegistrar all() {
+		return new IterableRegistrar(
+				ArraysAsListSerializer.registrar(),
+				BitSetSerializer.registrar(),
+				PriorityQueueSerializer.registrar(),
+				RegexSerializer.registrar(),
+				SqlDateSerializer.registrar(),
+				SqlTimeSerializer.registrar(),
+				TimestampSerializer.registrar(),
+				URISerializer.registrar(),
+				InetSocketAddressSerializer.registrar(),
+				UUIDSerializer.registrar(),
+				LocaleSerializer.registrar(),
+				SimpleDateFormatSerializer.registrar());
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/PriorityQueueSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/PriorityQueueSerializer.java
new file mode 100644
index 00000000000..11481719a32
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/types/PriorityQueueSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.types;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.twitter.chill.IKryoRegistrar;
+import com.twitter.chill.SingleRegistrar;
+
+import java.lang.reflect.Field;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+class PriorityQueueSerializer extends Serializer<PriorityQueue<?>> {
+	private Field compField;
+
+	public static IKryoRegistrar registrar() {
+		return new SingleRegistrar(PriorityQueue.class, new PriorityQueueSerializer());
+	}
+
+	public PriorityQueueSerializer() {
+		try {
+			compField = PriorityQueue.class.getDeclaredField("comparator");
+			compField.setAccessible(true);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public Comparator<?> getComparator(PriorityQueue<?> q) {
+		try {
+			return (Comparator<?>) compField.get(q);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public void write(Kryo k, Output o, PriorityQueue<?> q) {
+		k.writeClassAndObject(o, getComparator(q));
+		o.writeInt(q.size(), true);
+		for (Object a : q) {
+			k.writeClassAndObject(o, a);
+			o.flush();
+		}
+	}
+
+	public PriorityQueue<?> read(Kryo k, Input i, Class<PriorityQueue<?>> c) {
+		Comparator<Object> comp = (Comparator<Object>) k.readClassAndObject(i);
+		int sz = i.readInt(true);
+		// can't create with size 0:
+		PriorityQueue<Object> result;
+		if (sz == 0) {
+			result = new PriorityQueue<Object>(1, comp);
+		}
+		else {
+			result = new PriorityQueue<Object>(sz, comp);
+		}
+		int idx = 0;
+		while (idx < sz) {
+			result.add(k.readClassAndObject(i));
+			idx += 1;
+		}
+		return result;
+	}
+}
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 2008ad87d56..bfb9ccb04f3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1412,8 +1412,8 @@ class TaskManager(
             accumulatorEvents.append(accumulators)
           } catch {
             case e: Exception =>
-              log.warn("Failed to take accumulator snapshot for task {}.",
-                execID, ExceptionUtils.getRootCause(e))
+              log.warn(s"Failed to take accumulator snapshot for task $execID.",
+                ExceptionUtils.getRootCause(e))
           }
       }
 
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala
new file mode 100644
index 00000000000..fc9903cc33d
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.types
+
+import scala.collection.immutable.{BitSet, HashMap, HashSet, ListMap, ListSet, NumericRange, Queue, Range, SortedMap, SortedSet}
+import scala.collection.mutable.{Buffer, ListBuffer, WrappedArray, BitSet => MBitSet, HashMap => MHashMap, HashSet => MHashSet, Map => MMap, Queue => MQueue, Set => MSet}
+import scala.util.matching.Regex
+import _root_.java.io.Serializable
+
+import com.twitter.chill._
+
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * This class has a no-arg constructor, suitable for use with reflection instantiation
+ * It has no registered serializers, just the standard Kryo configured for Kryo.
+ */
+class EmptyFlinkScalaKryoInstantiator extends KryoInstantiator {
+  override def newKryo = {
+    val k = new KryoBase
+    k.setRegistrationRequired(false)
+    k.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy)
+
+    // Handle cases where we may have an odd classloader setup like with libjars
+    // for hadoop
+    val classLoader = Thread.currentThread.getContextClassLoader
+    k.setClassLoader(classLoader)
+
+    k
+  }
+}
+
+object FlinkScalaKryoInstantiator extends Serializable {
+  private val mutex = new AnyRef with Serializable // some serializable object
+  @transient private var kpool: KryoPool = null
+
+  /**
+   * Return a KryoPool that uses the FlinkScalaKryoInstantiator
+   */
+  def defaultPool: KryoPool = mutex.synchronized {
+    if (null == kpool) {
+      kpool = KryoPool.withByteArrayOutputStream(guessThreads, new FlinkScalaKryoInstantiator)
+    }
+    kpool
+  }
+
+  private def guessThreads: Int = {
+    val cores = Runtime.getRuntime.availableProcessors
+    val GUESS_THREADS_PER_CORE = 4
+    GUESS_THREADS_PER_CORE * cores
+  }
+}
+
+/** Makes an empty instantiator then registers everything */
+class FlinkScalaKryoInstantiator extends EmptyFlinkScalaKryoInstantiator {
+  override def newKryo = {
+    val k = super.newKryo
+    val reg = new AllScalaRegistrar
+    reg(k)
+    k
+  }
+}
+
+class ScalaCollectionsRegistrar extends IKryoRegistrar {
+  def apply(newK: Kryo) {
+    // for binary compat this is here, but could be moved to RichKryo
+    def useField[T](cls: Class[T]) {
+      val fs = new com.esotericsoftware.kryo.serializers.FieldSerializer(newK, cls)
+      fs.setIgnoreSyntheticFields(false) // scala generates a lot of these attributes
+      newK.register(cls, fs)
+    }
+    // The wrappers are private classes:
+    useField(List(1, 2, 3).asJava.getClass)
+    useField(List(1, 2, 3).iterator.asJava.getClass)
+    useField(Map(1 -> 2, 4 -> 3).asJava.getClass)
+    useField(new _root_.java.util.ArrayList().asScala.getClass)
+    useField(new _root_.java.util.HashMap().asScala.getClass)
+
+    /*
+     * Note that subclass-based use: addDefaultSerializers, else: register
+     * You should go from MOST specific, to least to specific when using
+     * default serializers. The FIRST one found is the one used
+     */
+    newK
+      // wrapper array is abstract
+      .forSubclass[WrappedArray[Any]](new WrappedArraySerializer[Any])
+      .forSubclass[BitSet](new BitSetSerializer)
+      .forSubclass[SortedSet[Any]](new SortedSetSerializer)
+      .forClass[Some[Any]](new SomeSerializer[Any])
+      .forClass[Left[Any, Any]](new LeftSerializer[Any, Any])
+      .forClass[Right[Any, Any]](new RightSerializer[Any, Any])
+      .forTraversableSubclass(Queue.empty[Any])
+      // List is a sealed class, so there are only two subclasses:
+      .forTraversableSubclass(List.empty[Any])
+      // Add ListBuffer subclass before Buffer to prevent the more general case taking precedence
+      .forTraversableSubclass(ListBuffer.empty[Any], isImmutable = false)
+      // add mutable Buffer before Vector, otherwise Vector is used
+      .forTraversableSubclass(Buffer.empty[Any], isImmutable = false)
+      // Vector is a final class
+      .forTraversableClass(Vector.empty[Any])
+      .forTraversableSubclass(ListSet.empty[Any])
+      // specifically register small sets since Scala represents them differently
+      .forConcreteTraversableClass(Set[Any]('a))
+      .forConcreteTraversableClass(Set[Any]('a, 'b))
+      .forConcreteTraversableClass(Set[Any]('a, 'b, 'c))
+      .forConcreteTraversableClass(Set[Any]('a, 'b, 'c, 'd))
+      // default set implementation
+      .forConcreteTraversableClass(HashSet[Any]('a, 'b, 'c, 'd, 'e))
+      // specifically register small maps since Scala represents them differently
+      .forConcreteTraversableClass(Map[Any, Any]('a -> 'a))
+      .forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b))
+      .forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c))
+      .forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c, 'd -> 'd))
+      // default map implementation
+      .forConcreteTraversableClass(
+        HashMap[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c, 'd -> 'd, 'e -> 'e))
+      // The normal fields serializer works for ranges
+      .registerClasses(Seq(classOf[Range.Inclusive],
+      classOf[NumericRange.Inclusive[_]],
+      classOf[NumericRange.Exclusive[_]]))
+      // Add some maps
+      .forSubclass[SortedMap[Any, Any]](new SortedMapSerializer)
+      .forTraversableSubclass(ListMap.empty[Any, Any])
+      .forTraversableSubclass(HashMap.empty[Any, Any])
+      // The above ListMap/HashMap must appear before this:
+      .forTraversableSubclass(Map.empty[Any, Any])
+      // here are the mutable ones:
+      .forTraversableClass(MBitSet.empty, isImmutable = false)
+      .forTraversableClass(MHashMap.empty[Any, Any], isImmutable = false)
+      .forTraversableClass(MHashSet.empty[Any], isImmutable = false)
+      .forTraversableSubclass(MQueue.empty[Any], isImmutable = false)
+      .forTraversableSubclass(MMap.empty[Any, Any], isImmutable = false)
+      .forTraversableSubclass(MSet.empty[Any], isImmutable = false)
+  }
+}
+
+class JavaWrapperCollectionRegistrar extends IKryoRegistrar {
+  def apply(newK: Kryo) {
+    newK.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
+  }
+}
+
+/** Registers all the scala (and java) serializers we have */
+class AllScalaRegistrar extends IKryoRegistrar {
+  def apply(k: Kryo) {
+    val col = new ScalaCollectionsRegistrar
+    col(k)
+
+    val jcol = new JavaWrapperCollectionRegistrar
+    jcol(k)
+
+    // Register all 22 tuple serializers and specialized serializers
+    ScalaTupleSerialization.register(k)
+    k.forClass[Symbol](new KSerializer[Symbol] {
+      override def isImmutable = true
+      def write(k: Kryo, out: Output, obj: Symbol) { out.writeString(obj.name) }
+      def read(k: Kryo, in: Input, cls: Class[Symbol]) = Symbol(in.readString)
+    })
+      .forSubclass[Regex](new RegexSerializer)
+      .forClass[ClassManifest[Any]](new ClassManifestSerializer[Any])
+      .forSubclass[Manifest[Any]](new ManifestSerializer[Any])
+      .forSubclass[scala.Enumeration#Value](new EnumerationSerializer)
+
+    // use the singleton serializer for boxed Unit
+    val boxedUnit = scala.Unit.box(())
+    k.register(boxedUnit.getClass, new SingletonSerializer(boxedUnit))
+    FlinkChillPackageRegistrar.all()(k)
+  }
+}
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/types/JavaIterableWrapperSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/types/JavaIterableWrapperSerializer.scala
new file mode 100644
index 00000000000..a547581d9b3
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/types/JavaIterableWrapperSerializer.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.types
+
+import _root_.java.lang.{Iterable => JIterable}
+import com.twitter.chill.{Input, KSerializer, Kryo, Output}
+
+/**
+ * A Kryo serializer for serializing results returned by asJavaIterable.
+ *
+ * The underlying object is scala.collection.convert.Wrappers$IterableWrapper.
+ * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work.
+ *
+ * Ported from Apache Spark's KryoSerializer.scala.
+ */
+private class JavaIterableWrapperSerializer extends KSerializer[JIterable[_]] {
+
+  import JavaIterableWrapperSerializer._
+
+  override def write(kryo: Kryo, out: Output, obj: JIterable[_]): Unit = {
+    // If the object is the wrapper, simply serialize the underlying Scala Iterable object.
+    // Otherwise, serialize the object itself.
+    if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) {
+      kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj))
+    } else {
+      kryo.writeClassAndObject(out, obj)
+    }
+  }
+
+  override def read(kryo: Kryo, in: Input, clz: Class[JIterable[_]]): JIterable[_] = {
+    kryo.readClassAndObject(in) match {
+      case scalaIterable: Iterable[_] =>
+        scala.collection.JavaConversions.asJavaIterable(scalaIterable)
+      case javaIterable: JIterable[_] =>
+        javaIterable
+    }
+  }
+}
+
+private object JavaIterableWrapperSerializer {
+  // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
+  val wrapperClass = scala.collection.JavaConversions.asJavaIterable(Seq(1)).getClass
+
+  // Get the underlying method so we can use it to get the Scala collection for serialization.
+  private val underlyingMethodOpt = {
+    try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
+      case e: Exception =>
+        None
+    }
+  }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
index 8e5b1b9b392..dc9e8f5982a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -56,6 +56,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.util.Collection;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -152,9 +153,11 @@ public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
 
 				Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
 
+				//noinspection RedundantCast
 				final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
-					((ExtendedActorSystem) system),
-					() -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+						((ExtendedActorSystem) system),
+						// we need the explicit cast to disambiguate the function call
+						(Callable<SubmittedJobGraph>) () -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
 
 				assertThat(recoveredJobGraph, is(notNullValue()));
 
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index ebe46399395..355994f1c53 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -24,9 +24,8 @@ import java.util.function.BiFunction
 import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
-import org.apache.flink.core.fs.FSDataInputStream
+import org.apache.flink.api.common.accumulators.Accumulator
 import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
@@ -39,10 +38,11 @@ import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.state.memory.MemoryStateBackend
-import org.apache.flink.runtime.state.{StateBackend, StateBackendLoader, StreamStateHandle}
+import org.apache.flink.runtime.state.StateBackendLoader
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages._
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import org.apache.flink.util.OptionalFailure
 
 import scala.collection.mutable
 import scala.concurrent.Future
@@ -75,7 +75,7 @@ trait TestingJobManagerLike extends FlinkActor {
   val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
     new Ordering[(Int, ActorRef)] {
       override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
-    })
+    }).result()
 
   val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
 
@@ -241,7 +241,10 @@ trait TestingJobManagerLike extends FlinkActor {
         case (jobID, (updated, actors)) if updated =>
           currentJobs.get(jobID) match {
             case Some((graph, jobInfo)) =>
-              val userAccumulators = graph.aggregateUserAccumulators
+              val userAccumulators: java.util.Map[String, OptionalFailure[Accumulator[_, _]]] =
+                graph
+                  .aggregateUserAccumulators
+                  .asInstanceOf[java.util.Map[String, OptionalFailure[Accumulator[_, _]]]]
               actors foreach {
                  actor => actor ! UpdatedAccumulators(jobID, userAccumulators)
               }
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 508a8de9416..e7fc6e13c2b 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -50,6 +50,11 @@ under the License.
 			<artifactId>flink-shaded-asm</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-asm-6</artifactId>
+		</dependency>
+
 		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-reflect</artifactId>
@@ -85,6 +90,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- We need this for the patched FlinkScalaKryoInstantiator -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -171,13 +184,6 @@ under the License.
 						<jvmArg>-Xms128m</jvmArg>
 						<jvmArg>-Xmx512m</jvmArg>
 					</jvmArgs>
-					<compilerPlugins combine.children="append">
-					   <compilerPlugin>
-						   <groupId>org.scalamacros</groupId>
-						   <artifactId>paradise_${scala.version}</artifactId>
-						   <version>${scala.macros.version}</version>
-					   </compilerPlugin>
-				   </compilerPlugins>
 				</configuration>
 			</plugin>
 			
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index 79653466ed5..2932ed50a46 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -18,35 +18,40 @@
 package org.apache.flink.api.scala
 
 import java.io._
+import java.lang.invoke.SerializedLambda
 
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{FlinkException, InstantiationUtil}
 import org.slf4j.LoggerFactory
 
 import scala.collection.mutable.Map
 import scala.collection.mutable.Set
+import org.apache.flink.shaded.asm6.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
+import org.apache.flink.shaded.asm6.org.objectweb.asm.Opcodes._
 
-import org.apache.flink.shaded.asm5.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.apache.flink.shaded.asm5.org.objectweb.asm.Opcodes._
+import scala.collection.mutable
 
 /* This code is originally from the Apache Spark project. */
 @Internal
 object ClosureCleaner {
+
   val LOG = LoggerFactory.getLogger(this.getClass)
 
+  private val isScala2_11 = scala.util.Properties.versionString.contains("2.11")
+
   // Get an ASM class reader for a given class from the JAR that loaded it
-  private def getClassReader(cls: Class[_]): ClassReader = {
+  private[scala] def getClassReader(cls: Class[_]): ClassReader = {
     // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
     val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
     val resourceStream = cls.getResourceAsStream(className)
-    // todo: Fixme - continuing with earlier behavior ...
-    if (resourceStream == null) return new ClassReader(resourceStream)
-
-    val baos = new ByteArrayOutputStream(128)
-
-    copyStream(resourceStream, baos, true)
-    new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+    if (resourceStream == null) {
+      null
+    } else {
+      val baos = new ByteArrayOutputStream(128)
+      copyStream(resourceStream, baos, true)
+      new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+    }
   }
 
   // Check whether a class represents a Scala closure
@@ -54,110 +59,339 @@ object ClosureCleaner {
     cls.getName.contains("$anonfun$")
   }
 
-  // Get a list of the classes of the outer objects of a given closure object, obj;
+  // Get a list of the outer objects and their classes of a given closure object, obj;
   // the outer objects are defined as any closures that obj is nested within, plus
   // possibly the class that the outermost closure is in, if any. We stop searching
   // for outer objects beyond that because cloning the user's object is probably
   // not a good idea (whereas we can clone closure objects just fine since we
   // understand how all their fields are used).
-  private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
+  private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
     for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
       f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.getType :: getOuterClasses(f.get(obj))
-      } else {
-        return f.getType :: Nil // Stop at the first $outer that is not a closure
+      val outer = f.get(obj)
+      // The outer pointer may be null if we have cleaned this closure before
+      if (outer != null) {
+        if (isClosure(f.getType)) {
+          val recurRet = getOuterClassesAndObjects(outer)
+          return (f.getType :: recurRet._1, outer :: recurRet._2)
+        } else {
+          return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
+        }
       }
     }
-    Nil
+    (Nil, Nil)
   }
-
-  // Get a list of the outer objects for a given closure object.
-  private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
-    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
-      f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.get(obj) :: getOuterObjects(f.get(obj))
-      } else {
-        return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
+  /**
+   * Return a list of classes that represent closures enclosed in the given closure object.
+   */
+  private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
+    val seen = Set[Class[_]](obj.getClass)
+    val stack = mutable.Stack[Class[_]](obj.getClass)
+    while (!stack.isEmpty) {
+      val cr = getClassReader(stack.pop())
+      if (cr != null) {
+        val set = Set.empty[Class[_]]
+        cr.accept(new InnerClosureFinder(set), 0)
+        for (cls <- set -- seen) {
+          seen += cls
+          stack.push(cls)
+        }
       }
     }
-    Nil
+    (seen - obj.getClass).toList
   }
 
-  private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
-    val seen = Set[Class[_]](obj.getClass)
-    var stack = List[Class[_]](obj.getClass)
-    while (stack.nonEmpty) {
-      val cr = getClassReader(stack.head)
-      stack = stack.tail
-      val set = Set[Class[_]]()
-      cr.accept(new InnerClosureFinder(set), 0)
-      for (cls <- set -- seen) {
-        seen += cls
-        stack = cls :: stack
+  /** Initializes the accessed fields for outer classes and their super classes. */
+  private def initAccessedFields(
+      accessedFields: Map[Class[_], Set[String]],
+      outerClasses: Seq[Class[_]]): Unit = {
+    for (cls <- outerClasses) {
+      var currentClass = cls
+      assert(currentClass != null, "The outer class can't be null.")
+
+      while (currentClass != null) {
+        accessedFields(currentClass) = Set.empty[String]
+        currentClass = currentClass.getSuperclass()
       }
     }
-    (seen - obj.getClass).toList
   }
 
-  private def createNullValue(cls: Class[_]): AnyRef = {
-    if (cls.isPrimitive) {
-      new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
-    } else {
-      null
+  /** Sets accessed fields for given class in clone object based on given object. */
+  private def setAccessedFields(
+      outerClass: Class[_],
+      clone: AnyRef,
+      obj: AnyRef,
+      accessedFields: Map[Class[_], Set[String]]): Unit = {
+    for (fieldName <- accessedFields(outerClass)) {
+      val field = outerClass.getDeclaredField(fieldName)
+      field.setAccessible(true)
+      val value = field.get(obj)
+      field.set(clone, value)
     }
   }
 
-  def clean(func: AnyRef, checkSerializable: Boolean = true) {
-    // TODO: cache outerClasses / innerClasses / accessedFields
-    val outerClasses = getOuterClasses(func)
-    val innerClasses = getInnerClasses(func)
-    val outerObjects = getOuterObjects(func)
+  /** Clones a given object and sets accessed fields in cloned object. */
+  private def cloneAndSetFields(
+      parent: AnyRef,
+      obj: AnyRef,
+      outerClass: Class[_],
+      accessedFields: Map[Class[_], Set[String]]): AnyRef = {
+    val clone = instantiateClass(outerClass, parent)
 
-    val accessedFields = Map[Class[_], Set[String]]()
+    var currentClass = outerClass
+    assert(currentClass != null, "The outer class can't be null.")
 
-    getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+    while (currentClass != null) {
+      setAccessedFields(currentClass, clone, obj, accessedFields)
+      currentClass = currentClass.getSuperclass()
+    }
 
-    for (cls <- outerClasses)
-      accessedFields(cls) = Set[String]()
-    for (cls <- func.getClass :: innerClasses)
-      getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
+    clone
+  }
 
-    if (LOG.isDebugEnabled) {
-      LOG.debug("accessedFields: " + accessedFields)
-    }
+  /**
+   * Clean the given closure in place.
+   *
+   * More specifically, this renders the given closure serializable as long as it does not
+   * explicitly reference unserializable objects.
+   *
+   * @param closure the closure to clean
+   * @param checkSerializable whether to verify that the closure is serializable after cleaning
+   * @param cleanTransitively whether to clean enclosing closures transitively
+   */
+  def clean(
+      closure: AnyRef,
+      checkSerializable: Boolean = true,
+      cleanTransitively: Boolean = true): Unit = {
+    clean(closure, checkSerializable, cleanTransitively, Map.empty)
+  }
 
-    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
-    var outer: AnyRef = null
-    if (outerPairs.nonEmpty && !isClosure(outerPairs.head._1)) {
-      // The closure is ultimately nested inside a class; keep the object of that
-      // class without cloning it since we don't want to clone the user's objects.
-      outer = outerPairs.head._2
-      outerPairs = outerPairs.tail
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = {
+    if (isScala2_11) {
+      return None
     }
-    // Clone the closure objects themselves, nulling out any fields that are not
-    // used in the closure we're working on or any of its inner closures.
-    for ((cls, obj) <- outerPairs) {
-      outer = instantiateClass(cls, outer)
-      for (fieldName <- accessedFields(cls)) {
-        val field = cls.getDeclaredField(fieldName)
-        field.setAccessible(true)
-        val value = field.get(obj)
-        if (LOG.isDebugEnabled) {
-          LOG.debug("1: Setting " + fieldName + " on " + cls + " to " + value)
-        }
-        field.set(outer, value)
+    val isClosureCandidate =
+      closure.getClass.isSynthetic &&
+        closure
+          .getClass
+          .getInterfaces.exists(_.getName == "scala.Serializable")
+
+    if (isClosureCandidate) {
+      try {
+        Option(inspect(closure))
+      } catch {
+        case e: Exception =>
+          if (LOG.isDebugEnabled) {
+            LOG.debug("Closure is not a serialized lambda.", e)
+          }
+          None
       }
+    } else {
+      None
     }
+  }
+
+  private def inspect(closure: AnyRef): SerializedLambda = {
+    val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+    writeReplace.setAccessible(true)
+    writeReplace.invoke(closure).asInstanceOf[java.lang.invoke.SerializedLambda]
+  }
+
+  /**
+   * Helper method to clean the given closure in place.
+   *
+   * The mechanism is to traverse the hierarchy of enclosing closures and null out any
+   * references along the way that are not actually used by the starting closure, but are
+   * nevertheless included in the compiled anonymous classes. Note that it is unsafe to
+   * simply mutate the enclosing closures in place, as other code paths may depend on them.
+   * Instead, we clone each enclosing closure and set the parent pointers accordingly.
+   *
+   * By default, closures are cleaned transitively. This means we detect whether enclosing
+   * objects are actually referenced by the starting one, either directly or transitively,
+   * and, if not, sever these closures from the hierarchy. In other words, in addition to
+   * nulling out unused field references, we also null out any parent pointers that refer
+   * to enclosing objects not actually needed by the starting closure. We determine
+   * transitivity by tracing through the tree of all methods ultimately invoked by the
+   * inner closure and record all the fields referenced in the process.
+   *
+   * For instance, transitive cleaning is necessary in the following scenario:
+   *
+   *   class SomethingNotSerializable {
+   *     def someValue = 1
+   *     def scope(name: String)(body: => Unit) = body
+   *     def someMethod(): Unit = scope("one") {
+   *       def x = someValue
+   *       def y = 2
+   *       scope("two") { println(y + 1) }
+   *     }
+   *   }
+   *
+   * In this example, scope "two" is not serializable because it references scope "one", which
+   * references SomethingNotSerializable. Note that, however, the body of scope "two" does not
+   * actually depend on SomethingNotSerializable. This means we can safely null out the parent
+   * pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two"
+   * no longer references SomethingNotSerializable transitively.
+   *
+   * @param func the starting closure to clean
+   * @param checkSerializable whether to verify that the closure is serializable after cleaning
+   * @param cleanTransitively whether to clean enclosing closures transitively
+   * @param accessedFields a map from a class to a set of its fields that are accessed by
+   *                       the starting closure
+   */
+  private def clean(
+      func: AnyRef,
+      checkSerializable: Boolean,
+      cleanTransitively: Boolean,
+      accessedFields: Map[Class[_], Set[String]]): Unit = {
+
+    // most likely to be the case with 2.12, 2.13
+    // so we check first
+    // non LMF-closures should be less frequent from now on
+    val lambdaFunc = getSerializedLambda(func)
+
+    if (!isClosure(func.getClass) && lambdaFunc.isEmpty) {
+      LOG.debug(s"Expected a closure; got ${func.getClass.getName}")
+      return
+    }
+
+    // TODO: clean all inner closures first. This requires us to find the inner objects.
+    // TODO: cache outerClasses / innerClasses / accessedFields
+
+    if (func == null) {
+      return
+    }
+
+    if (lambdaFunc.isEmpty) {
+      LOG.debug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
+
+      // A list of classes that represents closures enclosed in the given one
+      val innerClasses = getInnerClosureClasses(func)
+
+      // A list of enclosing objects and their respective classes, from innermost to outermost
+      // An outer object at a given index is of type outer class at the same index
+      val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+      // For logging purposes only
+      val declaredFields = func.getClass.getDeclaredFields
+      val declaredMethods = func.getClass.getDeclaredMethods
 
-    if (outer != null) {
       if (LOG.isDebugEnabled) {
-        LOG.debug("2: Setting $outer on " + func.getClass + " to " + outer)
+        LOG.debug(s" + declared fields: ${declaredFields.size}")
+        declaredFields.foreach { f => LOG.debug(s"     $f") }
+        LOG.debug(s" + declared methods: ${declaredMethods.size}")
+        declaredMethods.foreach { m => LOG.debug(s"     $m") }
+        LOG.debug(s" + inner classes: ${innerClasses.size}")
+        innerClasses.foreach { c => LOG.debug(s"     ${c.getName}") }
+        LOG.debug(s" + outer classes: ${outerClasses.size}" )
+        outerClasses.foreach { c => LOG.debug(s"     ${c.getName}") }
+        LOG.debug(s" + outer objects: ${outerObjects.size}")
+        outerObjects.foreach { o => LOG.debug(s"     $o") }
       }
-      val field = func.getClass.getDeclaredField("$outer")
-      field.setAccessible(true)
-      field.set(func, outer)
+
+      // Fail fast if we detect return statements in closures
+      getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+      // If accessed fields is not populated yet, we assume that
+      // the closure we are trying to clean is the starting one
+      if (accessedFields.isEmpty) {
+        LOG.debug(" + populating accessed fields because this is the starting closure")
+        // Initialize accessed fields with the outer classes first
+        // This step is needed to associate the fields to the correct classes later
+        initAccessedFields(accessedFields, outerClasses)
+
+        // Populate accessed fields by visiting all fields and methods accessed by this and
+        // all of its inner closures. If transitive cleaning is enabled, this may recursively
+        // visits methods that belong to other classes in search of transitively referenced fields.
+        for (cls <- func.getClass :: innerClasses) {
+          getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0)
+        }
+      }
+
+      LOG.debug(s" + fields accessed by starting closure: " + accessedFields.size)
+      accessedFields.foreach { f => LOG.debug("     " + f) }
+
+      // List of outer (class, object) pairs, ordered from outermost to innermost
+      // Note that all outer objects but the outermost one (first one in this list) must be closures
+      var outerPairs: List[(Class[_], AnyRef)] = outerClasses.zip(outerObjects).reverse
+      var parent: AnyRef = null
+      if (outerPairs.nonEmpty) {
+        val (outermostClass, outermostObject) = outerPairs.head
+        if (isClosure(outermostClass)) {
+          LOG.debug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
+        } else if (outermostClass.getName.startsWith("$line")) {
+          // SPARK-14558: if the outermost object is a REPL line object, we should clone
+          // and clean it as it may carray a lot of unnecessary information,
+          // e.g. hadoop conf, spark conf, etc.
+          LOG.debug(
+            s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
+        } else {
+          // The closure is ultimately nested inside a class; keep the object of that
+          // class without cloning it since we don't want to clone the user's objects.
+          // Note that we still need to keep around the outermost object itself because
+          // we need it to clone its child closure later (see below).
+          LOG.debug(" + outermost object is not a closure or REPL line object," +
+            "so do not clone it: " +  outerPairs.head)
+          parent = outermostObject // e.g. SparkContext
+          outerPairs = outerPairs.tail
+        }
+      } else {
+        LOG.debug(" + there are no enclosing objects!")
+      }
+
+      // Clone the closure objects themselves, nulling out any fields that are not
+      // used in the closure we're working on or any of its inner closures.
+      for ((cls, obj) <- outerPairs) {
+        LOG.debug(s" + cloning the object $obj of class ${cls.getName}")
+        // We null out these unused references by cloning each object and then filling in all
+        // required fields from the original object. We need the parent here because the Java
+        // language specification requires the first constructor parameter of any closure to be
+        // its enclosing object.
+        val clone = cloneAndSetFields(parent, obj, cls, accessedFields)
+
+        // If transitive cleaning is enabled, we recursively clean any enclosing closure using
+        // the already populated accessed fields map of the starting closure
+        if (cleanTransitively && isClosure(clone.getClass)) {
+          LOG.debug(s" + cleaning cloned closure $clone recursively (${cls.getName})")
+          // No need to check serializable here for the outer closures because we're
+          // only interested in the serializability of the starting closure
+          clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
+        }
+        parent = clone
+      }
+
+      // Update the parent pointer ($outer) of this closure
+      if (parent != null) {
+        val field = func.getClass.getDeclaredField("$outer")
+        field.setAccessible(true)
+        // If the starting closure doesn't actually need our enclosing object, then just null it out
+        if (accessedFields.contains(func.getClass) &&
+          !accessedFields(func.getClass).contains("$outer")) {
+          LOG.debug(s" + the starting closure doesn't actually need $parent, so we null it out")
+          field.set(func, null)
+        } else {
+          // Update this closure's parent pointer to point to our enclosing object,
+          // which could either be a cloned closure or the original user object
+          field.set(func, parent)
+        }
+      }
+
+      LOG.debug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++")
+    } else {
+      LOG.debug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+
+      // scalastyle:off classforname
+      val captClass = Class.forName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
+        false, Thread.currentThread.getContextClassLoader)
+      // scalastyle:on classforname
+      // Fail fast if we detect return statements in closures
+      getClassReader(captClass)
+        .accept(new ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
+      LOG.debug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is now cleaned +++")
     }
 
     if (checkSerializable) {
@@ -165,7 +399,7 @@ object ClosureCleaner {
     }
   }
 
-  def ensureSerializable(func: AnyRef) {
+  private[flink] def ensureSerializable(func: AnyRef) {
     try {
       InstantiationUtil.serializeObject(func)
     } catch {
@@ -173,24 +407,28 @@ object ClosureCleaner {
     }
   }
 
-  private def instantiateClass(cls: Class[_], outer: AnyRef): AnyRef = {
-    if (LOG.isDebugEnabled) {
-      LOG.debug("Creating a " + cls + " with outer = " + outer)
-    }
-    // This is a bona fide closure class, whose constructor has no effects
-    // other than to set its fields, so use its constructor
-    val cons = cls.getConstructors()(0)
-    val params = cons.getParameterTypes.map(createNullValue)
-    if (outer != null) {
-      params(0) = outer // First param is always outer object
+  private def instantiateClass(
+      cls: Class[_],
+      enclosingObject: AnyRef): AnyRef = {
+    // Use reflection to instantiate object without calling constructor
+    val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
+    val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
+    val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
+    val obj = newCtor.newInstance().asInstanceOf[AnyRef]
+    if (enclosingObject != null) {
+      val field = cls.getDeclaredField("$outer")
+      field.setAccessible(true)
+      field.set(obj, enclosingObject)
     }
-    cons.newInstance(params: _*).asInstanceOf[AnyRef]
+    obj
   }
 
+
   /** Copy all data from an InputStream to an OutputStream */
-  def copyStream(in: InputStream,
-                 out: OutputStream,
-                 closeStreams: Boolean = false): Long =
+  def copyStream(
+      in: InputStream,
+      out: OutputStream,
+      closeStreams: Boolean = false): Long =
   {
     var count = 0L
     try {
@@ -228,46 +466,107 @@ object ClosureCleaner {
   }
 }
 
-@Internal
-private[flink]
-class ReturnStatementFinder extends ClassVisitor(ASM5) {
-  override def visitMethod(access: Int, name: String, desc: String,
-                           sig: String, exceptions: Array[String]): MethodVisitor = {
-    if (name.contains("apply")) {
-      new MethodVisitor(ASM5) {
+private class ReturnStatementInClosureException
+  extends FlinkException("Return statements aren't allowed in Flink closures")
+
+private class ReturnStatementFinder(targetMethodName: Option[String] = None)
+  extends ClassVisitor(ASM6) {
+  override def visitMethod(
+      access: Int,
+      name: String,
+      desc: String,
+      sig: String, exceptions: Array[String]): MethodVisitor = {
+
+    // $anonfun$ covers Java 8 lambdas
+    if (name.contains("apply") || name.contains("$anonfun$")) {
+      // A method with suffix "$adapted" will be generated in cases like
+      // { _:Int => return; Seq()} but not { _:Int => return; true}
+      // closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1
+      // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see
+      // https://github.com/scala/scala-dev/issues/109
+      val isTargetMethod = targetMethodName.isEmpty ||
+        name == targetMethodName.get || name == targetMethodName.get.stripSuffix("$adapted")
+
+      new MethodVisitor(ASM6) {
         override def visitTypeInsn(op: Int, tp: String) {
-          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
-            throw new InvalidProgramException("Return statements aren't allowed in Flink closures")
+          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl") && isTargetMethod) {
+            throw new ReturnStatementInClosureException
           }
         }
       }
     } else {
-      new MethodVisitor(ASM5) {}
+      new MethodVisitor(ASM6) {}
     }
   }
 }
 
-@Internal
-private[flink]
-class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM5) {
-  override def visitMethod(access: Int, name: String, desc: String,
-                           sig: String, exceptions: Array[String]): MethodVisitor = {
-    new MethodVisitor(ASM5) {
+/** Helper class to identify a method. */
+private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
+
+/**
+ * Find the fields accessed by a given class.
+ *
+ * The resulting fields are stored in the mutable map passed in through the constructor.
+ * This map is assumed to have its keys already populated with the classes of interest.
+ *
+ * @param fields the mutable map that stores the fields to return
+ * @param findTransitively if true, find fields indirectly referenced through method calls
+ * @param specificMethod if not empty, visit only this specific method
+ * @param visitedMethods a set of visited methods to avoid cycles
+ */
+private class FieldAccessFinder(
+    fields: Map[Class[_], Set[String]],
+    findTransitively: Boolean,
+    specificMethod: Option[MethodIdentifier[_]] = None,
+    visitedMethods: Set[MethodIdentifier[_]] = Set.empty)
+  extends ClassVisitor(ASM6) {
+
+  override def visitMethod(
+      access: Int,
+      name: String,
+      desc: String,
+      sig: String,
+      exceptions: Array[String]): MethodVisitor = {
+
+    // If we are told to visit only a certain method and this is not the one, ignore it
+    if (specificMethod.isDefined &&
+      (specificMethod.get.name != name || specificMethod.get.desc != desc)) {
+      return null
+    }
+
+    new MethodVisitor(ASM6) {
       override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
         if (op == GETFIELD) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
+          for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
+            fields(cl) += name
           }
         }
       }
 
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-                                   desc: String) {
-        // Check for calls a getter method for a variable in an interpreter wrapper object.
-        // This means that the corresponding field will be accessed, so we should save it.
-        if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
+      override def visitMethodInsn(
+          op: Int, owner: String, name: String, desc: String, itf: Boolean) {
+        for (cl <- fields.keys if cl.getName == owner.replace('/', '.')) {
+          // Check for calls a getter method for a variable in an interpreter wrapper object.
+          // This means that the corresponding field will be accessed, so we should save it.
+          if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
+            fields(cl) += name
+          }
+          // Optionally visit other methods to find fields that are transitively referenced
+          if (findTransitively) {
+            val m = MethodIdentifier(cl, name, desc)
+            if (!visitedMethods.contains(m)) {
+              // Keep track of visited methods to avoid potential infinite cycles
+              visitedMethods += m
+
+              var currentClass = cl
+              assert(currentClass != null, "The outer class can't be null.")
+
+              while (currentClass != null) {
+                ClosureCleaner.getClassReader(currentClass).accept(
+                  new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0)
+                currentClass = currentClass.getSuperclass()
+              }
+            }
           }
         }
       }
@@ -275,10 +574,14 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor
   }
 }
 
-@Internal
-private[flink] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM5) {
+private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM6) {
   var myName: String = null
 
+  // TODO: Recursively find inner closures that we indirectly reference, e.g.
+  //   val closure1 = () = { () => 1 }
+  //   val closure2 = () => { (1 to 5).map(closure1) }
+  // The second closure technically has two inner closures, but this finder only finds one
+
   override def visit(version: Int, access: Int, name: String, sig: String,
                      superName: String, interfaces: Array[String]) {
     myName = name
@@ -286,20 +589,21 @@ private[flink] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
 
   override def visitMethod(access: Int, name: String, desc: String,
                            sig: String, exceptions: Array[String]): MethodVisitor = {
-    new MethodVisitor(ASM5) {
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-                                   desc: String) {
+    new MethodVisitor(ASM6) {
+      override def visitMethodInsn(
+          op: Int, owner: String, name: String, desc: String, itf: Boolean) {
         val argTypes = Type.getArgumentTypes(desc)
-        if (op == INVOKESPECIAL && name == "<init>" && argTypes.nonEmpty
+        if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
           && argTypes(0).toString.startsWith("L") // is it an object?
           && argTypes(0).getInternalName == myName) {
+          // scalastyle:off classforname
           output += Class.forName(
             owner.replace('/', '.'),
             false,
             Thread.currentThread.getContextClassLoader)
+          // scalastyle:on classforname
         }
       }
     }
   }
 }
-
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 74f5f5bef8f..a9fbbedbc71 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -397,7 +397,7 @@ private[flink] trait TypeInformationGen[C <: Context] {
     val result = c.inferImplicitValue(
       c.weakTypeOf[TypeInformation[T]],
       silent = true,
-      withMacrosDisabled =  false,
+      withMacrosDisabled = true,
       pos = c.enclosingPosition)
 
     if (result.isEmpty) {
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
index b2521b07a70..9f28c3de94f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
@@ -53,7 +54,7 @@ class OnDataSet[T](ds: DataSet[T]) {
   @PublicEvolving
   def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
     ds.mapPartition {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
@@ -100,7 +101,7 @@ class OnDataSet[T](ds: DataSet[T]) {
   @PublicEvolving
   def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
     ds.reduceGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
index 07abccb4238..36363588877 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
@@ -65,7 +66,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
   @PublicEvolving
   def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
     ds.reduceGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
@@ -80,7 +81,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
   @PublicEvolving
   def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
     ds.combineGroup {
-      (it, out) =>
+      (it: Iterator[T], out: Collector[R]) =>
         out.collect(fun(it.toStream))
     }
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
index 462007567d7..da93ca9b156 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -90,6 +90,19 @@ private[flink] abstract class UnfinishedKeyPairOperation[L, R, O](
     val leftKey = new Keys.SelectorFunctionKeys[L, K](keyExtractor, leftInput.getType, keyType)
     new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey)
   }
+
+  /**
+   * Specify the key selector function for the left side of the key based operation. This returns
+   * a [[HalfUnfinishedKeyPairOperation]] on which `equalTo` must be called to specify the
+   * key for the right side. The result after specifying the right side key is the finished
+   * operation.
+   */
+  def where[K: TypeInformation](fun: KeySelector[L, K]) = {
+    val keyType = implicitly[TypeInformation[K]]
+    val leftKey =
+      new Keys.SelectorFunctionKeys[L, K](leftInput.clean(fun), leftInput.getType, keyType)
+    new HalfUnfinishedKeyPairOperation[L, R, O](this, leftKey)
+  }
 }
 
 @Internal
@@ -140,6 +153,25 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, O](
       keyExtractor,
       unfinished.rightInput.getType,
       keyType)
+
+    if (!leftKey.areCompatible(rightKey)) {
+      throw new InvalidProgramException("The types of the key fields do not match. Left: " +
+        leftKey + " Right: " + rightKey)
+    }
+    unfinished.finish(leftKey, rightKey)
+  }
+
+  /**
+   * Specify the key selector function for the right side of the key based operation. This returns
+   * the finished operation.
+   */
+  def equalTo[K: TypeInformation](fun: KeySelector[R, K]): O = {
+
+    val keyType = implicitly[TypeInformation[K]]
+    val rightKey = new Keys.SelectorFunctionKeys[R, K](
+      unfinished.leftInput.clean(fun),
+      unfinished.rightInput.getType,
+      keyType)
     
     if (!leftKey.areCompatible(rightKey)) {
       throw new InvalidProgramException("The types of the key fields do not match. Left: " +
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index d543998c1eb..04bdab697c7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -72,7 +72,9 @@ package object utils {
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
         implicitly[TypeInformation[T]]
       )
-      wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+      wrap(jutils.zipWithIndex(self.javaSet)).map {
+        t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1)
+      }
     }
 
     /**
@@ -85,7 +87,9 @@ package object utils {
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
         implicitly[TypeInformation[T]]
       )
-      wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+      wrap(jutils.zipWithUniqueId(self.javaSet)).map {
+        t: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, T]=> (t.f0.toLong, t.f1)
+      }
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 397be35794a..d7cebb48502 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -248,7 +248,7 @@ under the License.
 					<parameter>
 						<excludes combine.children="append">
 							<!-- Exclude generated classes from api compatibility checks -->
-							<exclude>*\$\$anon\$*</exclude>
+							<exclude>*$$$$anon$$*</exclude>
 
 							<!-- Ignore method which was created automatically by Scala for default value calculation.
 							Can be removed once https://github.com/siom79/japicmp/issues/176 will be fixed -->
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 23260cca787..23d216549f8 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -414,6 +414,18 @@ class DataStream[T](stream: JavaStream[T]) {
     asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
   }
 
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations.
+   */
+  def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K] = {
+
+    val cleanFun = clean(fun)
+    val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
+
+    asScalaStream(new JavaKeyedStream(stream, cleanFun, keyType))
+  }
+
   /**
    * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
    * This method takes the key position to partition on, and a partitioner that accepts the key
@@ -1112,7 +1124,7 @@ class DataStream[T](stream: JavaStream[T]) {
     }
     val cleanFun = clean(fun)
     val sinkFunction = new SinkFunction[T] {
-      def invoke(in: T) = cleanFun(in)
+      override def invoke(in: T) = cleanFun(in)
     }
     this.addSink(sinkFunction)
   }
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index e8d5a1221bb..e4a81a77a69 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -322,9 +322,10 @@ class AllWindowTranslationTest {
     val window1 = source
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
-        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new DummyReducer,
+        new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
               input: Iterable[(String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
         })
@@ -361,9 +362,9 @@ class AllWindowTranslationTest {
     val window1 = source
       .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
       .reduce(
-        new DummyReducer, new AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
               input: Iterable[(String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
         })
@@ -519,7 +520,10 @@ class AllWindowTranslationTest {
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
         { (x, _) => x },
-        { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+        {
+          (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) =>
+            in foreach { x => out.collect(x)}
+        })
 
     val transform = window1
       .javaStream
@@ -756,7 +760,7 @@ class AllWindowTranslationTest {
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
       .aggregate(
         new DummyAggregator(),
-        { (_, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
+        { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
           in foreach { x => out.collect(x)}
         } })
 
@@ -898,9 +902,9 @@ class AllWindowTranslationTest {
       .fold(
         ("", "", 1),
         new DummyFolder,
-        new AllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
-          override def apply(
-              window: TimeWindow,
+        new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+          override def process(
+              context: Context,
               input: Iterable[(String, String, Int)],
               out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
         })
@@ -1104,7 +1108,7 @@ class AllWindowTranslationTest {
       .fold(
         ("", "", 1),
         { (acc: (String, String, Int), _) => acc },
-        { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+        { (_: TimeWindow, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
           in foreach { x => out.collect((x._1, x._3)) }
         })
 
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 5412e8e63ee..f6aede4184c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -86,7 +86,7 @@ class CoGroupJoinITCase extends AbstractTestBase {
           "F:" + first.mkString("") + " S:" + second.mkString("")
       }
       .addSink(new SinkFunction[String]() {
-        def invoke(value: String) {
+        override def invoke(value: String) {
           CoGroupJoinITCase.testResults += value
         }
       })
@@ -154,7 +154,7 @@ class CoGroupJoinITCase extends AbstractTestBase {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply( (l, r) => l.toString + ":" + r.toString)
       .addSink(new SinkFunction[String]() {
-        def invoke(value: String) {
+        override def invoke(value: String) {
           CoGroupJoinITCase.testResults += value
         }
       })
@@ -216,10 +216,10 @@ class CoGroupJoinITCase extends AbstractTestBase {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply( (l, r) => l.toString + ":" + r.toString)
       .addSink(new SinkFunction[String]() {
-      def invoke(value: String) {
-        CoGroupJoinITCase.testResults += value
-      }
-    })
+        override def invoke(value: String) {
+          CoGroupJoinITCase.testResults += value
+        }
+      })
 
     env.execute("Self-Join Test")
 
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 7b7dde5edae..f65caa49608 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -60,7 +60,7 @@ class DataStreamTest extends AbstractTestBase {
 
     val dataStream2 = env.generateSequence(0, 0).name("testSource2")
       .keyBy(x=>x)
-      .reduce((x, y) => 0)
+      .reduce((x, y) => 0L)
       .name("testReduce")
     assert("testReduce" == dataStream2.getName)
 
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index ef276856e50..9bb0caf1b60 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -75,7 +75,7 @@ class WindowFoldITCase extends AbstractTestBase {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .fold(("R:", 0)) { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
         WindowFoldITCase.testResults += value.toString
         }
       })
@@ -132,7 +132,7 @@ class WindowFoldITCase extends AbstractTestBase {
         foldFunc,
         new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFoldITCase.testResults += value.toString
         }
       })
@@ -191,7 +191,7 @@ class WindowFoldITCase extends AbstractTestBase {
         foldFunc,
         new CheckingIdentityRichProcessWindowFunction[(Int, String), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(Int, String)]() {
-        def invoke(value: (Int, String)) {
+        override def invoke(value: (Int, String)) {
           WindowFoldITCase.testResults += value.toString
         }
       })
@@ -239,7 +239,7 @@ class WindowFoldITCase extends AbstractTestBase {
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .fold(("R:", 0)) { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }
       .addSink(new SinkFunction[(String, Int)]() {
-      def invoke(value: (String, Int)) {
+      override def invoke(value: (String, Int)) {
         WindowFoldITCase.testResults += value.toString
       }
     })
@@ -294,7 +294,7 @@ class WindowFoldITCase extends AbstractTestBase {
         foldFunc,
         new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFoldITCase.testResults += value.toString
         }
       })
@@ -351,7 +351,7 @@ class WindowFoldITCase extends AbstractTestBase {
         foldFunc,
         new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFoldITCase.testResults += value.toString
         }
       })
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index ce5f09ebf28..5914d0b0b83 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -71,7 +71,7 @@ class WindowFunctionITCase extends TestLogger {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply(new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFunctionITCase.testResults += value.toString
         }
       })
@@ -120,7 +120,7 @@ class WindowFunctionITCase extends TestLogger {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .process(new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFunctionITCase.testResults += value.toString
         }
       })
@@ -168,7 +168,7 @@ class WindowFunctionITCase extends TestLogger {
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .apply(new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFunctionITCase.testResults += value.toString
         }
       })
@@ -216,7 +216,7 @@ class WindowFunctionITCase extends TestLogger {
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .process(new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowFunctionITCase.testResults += value.toString
         }
       })
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index b2137f59dc3..748d5a17f62 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -75,7 +75,7 @@ class WindowReduceITCase extends AbstractTestBase {
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowReduceITCase.testResults += value.toString
         }
       })
@@ -131,7 +131,7 @@ class WindowReduceITCase extends AbstractTestBase {
         reduceFunc,
         new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowReduceITCase.testResults += value.toString
         }
       })
@@ -189,7 +189,7 @@ class WindowReduceITCase extends AbstractTestBase {
         reduceFunc,
         new CheckingIdentityRichProcessWindowFunction[(String, Int), Tuple, TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowReduceITCase.testResults += value.toString
         }
       })
@@ -237,7 +237,7 @@ class WindowReduceITCase extends AbstractTestBase {
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
       .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
       .addSink(new SinkFunction[(String, Int)]() {
-      def invoke(value: (String, Int)) {
+      override def invoke(value: (String, Int)) {
         WindowReduceITCase.testResults += value.toString
       }
     })
@@ -291,7 +291,7 @@ class WindowReduceITCase extends AbstractTestBase {
         reduceFunc,
         new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowReduceITCase.testResults += value.toString
         }
       })
@@ -347,7 +347,7 @@ class WindowReduceITCase extends AbstractTestBase {
         reduceFunc,
         new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
+        override def invoke(value: (String, Int)) {
           WindowReduceITCase.testResults += value.toString
         }
       })
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 916884ff5ff..b501d9e15db 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -582,7 +582,10 @@ class WindowTranslationTest {
       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
       .reduce(
         { (x, _) => x },
-        { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+        {
+          (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) =>
+            in foreach { x => out.collect(x)}
+        })
 
     val transform = window1
       .javaStream
@@ -824,10 +827,11 @@ class WindowTranslationTest {
     val window1 = source
       .keyBy(_._1)
       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
-      .aggregate(new DummyAggregator(),
-        { (_, _, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
+      .aggregate(
+        new DummyAggregator(),
+        { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) =>
           in foreach { x => out.collect(x)}
-        } })
+        })
 
     val transform = window1
       .javaStream
@@ -1232,8 +1236,12 @@ class WindowTranslationTest {
       .fold(
         ("", "", 1),
         { (acc: (String, String, Int), _) => acc },
-        { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
-          in foreach { x => out.collect((x._1, x._3)) }
+        { (
+            _: String,
+            _: TimeWindow,
+            in: Iterable[(String, String, Int)],
+            out: Collector[(String, Int)]) =>
+              in foreach { x => out.collect((x._1, x._3)) }
         })
 
     val transform = window1
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 66a811a6379..47bb21c0714 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -39,12 +39,12 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeoutException;
 
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.Duration;
-import scala.concurrent.forkjoin.ForkJoinPool;
 import scala.concurrent.impl.ExecutionContextImpl;
 
 import static org.junit.Assert.fail;
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.2-jobmanager-savepoint
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.2-jobmanager-savepoint
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.2-rocksdb-savepoint
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.2-rocksdb-savepoint
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
similarity index 100%
rename from flink-tests/src/test/resources/stateful-scala-with-broadcast2.11-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
rename to flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
deleted file mode 100644
index 3d0f8c5cdb3..00000000000
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
deleted file mode 100644
index 5a763dfd406..00000000000
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
deleted file mode 100644
index 4e686852170..00000000000
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index 6c9e433f956..00000000000
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 8f1e1f88084..2a4dcf3e242 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -184,7 +184,7 @@ object TestObjectWithBogusReturns {
     try {
       nums.map { x => return 1; x * 2}.print()
     } catch {
-      case inv: InvalidProgramException => // all good
+      case inv: ReturnStatementInClosureException => // all good
       case _: Throwable => fail("Bogus return statement not detected.")
     }
 
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
index 4ae57c46920..980be5c22db 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
@@ -18,10 +18,13 @@
 
 package org.apache.flink.api.scala.migration
 
+@SerialVersionUID(-1196651213850142840L)
 case class CustomCaseClass(a: String, b: Long)
 
+@SerialVersionUID(-4562410952220372998L)
 case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
 
+@SerialVersionUID(137486769747470244L)
 object CustomEnum extends Enumeration {
   type CustomEnum = Value
   val ONE, TWO, THREE, FOUR = Value
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 557f54aa168..db9d5b40981 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -67,11 +67,6 @@ object StatefulJobSavepointMigrationITCase {
   val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_4
   val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME
 
-  val SCALA_VERSION: String = {
-    val versionString = Properties.versionString.split(" ")(1)
-    versionString.substring(0, versionString.lastIndexOf("."))
-  }
-
   val NUM_ELEMENTS = 4
 }
 
@@ -115,9 +110,7 @@ class StatefulJobSavepointMigrationITCase(
 
     executeAndSavepoint(
       env,
-      s"src/test/resources/stateful-scala" +
-        s"${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
-        s"-udf-migration-itcase-flink" +
+      s"src/test/resources/stateful-scala-udf-migration-itcase-flink" +
         s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
         s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
       new Tuple2(
@@ -159,7 +152,7 @@ class StatefulJobSavepointMigrationITCase(
     restoreAndExecute(
       env,
       SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+        s"stateful-scala" +
           s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
           s"-${migrationVersionAndBackend._2}-savepoint"),
       new Tuple2(
@@ -229,7 +222,7 @@ class StatefulJobSavepointMigrationITCase(
     }
 
     @throws[Exception]
-    def invoke(value: T) {
+    override def invoke(value: T) {
       count += 1
       getRuntimeContext.getAccumulator(
         AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 19575b49a5e..4f236e7c8ca 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -65,11 +65,6 @@ object StatefulJobWBroadcastStateMigrationITCase {
   val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_6
   val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.MEMORY_STATE_BACKEND_NAME
 
-  val SCALA_VERSION: String = {
-    val versionString = Properties.versionString.split(" ")(1)
-    versionString.substring(0, versionString.lastIndexOf("."))
-  }
-
   val NUM_ELEMENTS = 4
 }
 
@@ -138,7 +133,6 @@ class StatefulJobWBroadcastStateMigrationITCase(
     executeAndSavepoint(
       env,
       s"src/test/resources/stateful-scala-with-broadcast" +
-        s"${StatefulJobWBroadcastStateMigrationITCase.SCALA_VERSION}" +
         s"-udf-migration-itcase-flink" +
         s"${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_VER}" +
         s"-${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
@@ -211,7 +205,7 @@ class StatefulJobWBroadcastStateMigrationITCase(
     restoreAndExecute(
       env,
       SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala-with-broadcast${StatefulJobWBroadcastStateMigrationITCase.SCALA_VERSION}" +
+        s"stateful-scala-with-broadcast" +
           s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
           s"-${migrationVersionAndBackend._2}-savepoint"),
       new Tuple2(
@@ -318,7 +312,7 @@ private class AccumulatorCountingSink[T] extends RichSinkFunction[T] {
   }
 
   @throws[Exception]
-  def invoke(value: T) {
+  override def invoke(value: T) {
     count += 1
     getRuntimeContext.getAccumulator(
       AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 448479e8171..7746cd0dde3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -108,12 +108,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-      (first, second, out: Collector[(Int, Long, String)] ) =>
-        for (t <- first) {
-          if (t._1 < 6) {
-            out.collect(t)
+      (
+        first: Iterator[(Int, Long, String)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)] ) =>
+          for (t <- first) {
+            if (t._1 < 6) {
+              out.collect(t)
+            }
           }
-        }
     }
     coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -127,12 +130,15 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get5TupleDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
-      (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
-        for (t <- second) {
-          if (t._1 < 4) {
-            out.collect(t)
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, Int, String, Long)],
+        out: Collector[(Int, Long, Int, String, Long)]) =>
+          for (t <- second) {
+            if (t._1 < 4) {
+              out.collect(t)
+            }
           }
-        }
     }
     coGroupDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -247,13 +253,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds1 = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        val strs = first map(_._4)
-        for (t <- second) {
-          for (s <- strs) {
-            out.collect((t._1, t._2, s))
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          val strs = first map(_._4)
+          for (t <- second) {
+            for (s <- strs) {
+              out.collect((t._1, t._2, s))
+            }
           }
-        }
     }
 
     coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -273,13 +282,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
     val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
       .apply {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        val strs = first map(_._4)
-        for (t <- second) {
-          for (s <- strs) {
-            out.collect((t._1, t._2, s))
+      (
+        first: Iterator[(Int, Long, Int, String, Long)],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          val strs = first map(_._4)
+          for (t <- second) {
+            for (s <- strs) {
+              out.collect((t._1, t._2, s))
+            }
           }
-        }
     }
 
     coGrouped.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -325,13 +337,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -348,13 +363,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -371,13 +389,16 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds = CollectionDataSets.getSmallPojoDataSet(env)
     val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
     val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
-      (first, second, out: Collector[CustomType]) =>
-        for (p <- first) {
-          for (t <- second) {
-            Assert.assertTrue(p.nestedPojo.longNumber == t._7)
-            out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+      (
+        first: Iterator[CollectionDataSets.POJO],
+        second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+        out: Collector[CustomType]) =>
+          for (p <- first) {
+            for (t <- second) {
+              Assert.assertTrue(p.nestedPojo.longNumber == t._7)
+              out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"))
+            }
           }
-        }
     }
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
@@ -390,14 +411,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
     val ds2 = env.fromElements(0, 1, 2)
     val coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*") {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        for (p <- first) {
-          for (t <- second) {
-            if (p._1 == t) {
-              out.collect(p)
+      (
+        first: Iterator[(Int, Long, String)],
+        second: Iterator[Int],
+        out: Collector[(Int, Long, String)]) =>
+          for (p <- first) {
+            for (t <- second) {
+              if (p._1 == t) {
+                out.collect(p)
+              }
             }
           }
-        }
     }
 
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -411,14 +435,17 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
     val ds1 = env.fromElements(0, 1, 2)
     val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
     val coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0) {
-      (first, second, out: Collector[(Int, Long, String)]) =>
-        for (p <- first) {
-          for (t <- second) {
-            if (p == t._1) {
-              out.collect(t)
+      (
+        first: Iterator[Int],
+        second: Iterator[(Int, Long, String)],
+        out: Collector[(Int, Long, String)]) =>
+          for (p <- first) {
+            for (t <- second) {
+              if (p == t._1) {
+                out.collect(t)
+              }
             }
           }
-        }
     }
 
     coGroupDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 9d286fe1e9f..fabefb9081f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -46,7 +46,10 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     ds.combineGroup(new ScalaGroupCombineFunctionExample())
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
-    ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+    ds
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     // all methods on UnsortedGrouping
@@ -55,7 +58,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0)
-      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     // all methods on SortedGrouping
@@ -64,7 +69,9 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     ds.groupBy(0).sortGroup(0, Order.ASCENDING)
-      .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+      .combineGroup(
+        (in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+          in.toSet foreach (out.collect))
       .output(new DiscardingOutputFormat[Tuple1[String]])
 
     env.execute()
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 385691eb6db..9262cb3ac90 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -269,17 +269,19 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
       .map( t => MutableTuple3(t._1, t._2, t._3) )
     
     val reduceDs =  ds.groupBy(1).reduceGroup {
-      (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
-        for (t <- in) {
-          if (t._1 < 4) {
-            t._3 = "Hi!"
-            t._1 += 10
-            out.collect(t)
-            t._1 += 10
-            t._3 = "Hi again!"
-            out.collect(t)
+      (
+        in: Iterator[MutableTuple3[Int, Long, String]],
+        out: Collector[MutableTuple3[Int, Long, String]]) =>
+          for (t <- in) {
+            if (t._1 < 4) {
+              t._3 = "Hi!"
+              t._1 += 10
+              out.collect(t)
+              t._1 += 10
+              t._3 = "Hi again!"
+              out.collect(t)
+            }
           }
-        }
     }
     val result: Seq[String] = reduceDs.collect().map(x => s"${x._1},${x._2},${x._3}").sorted
     
@@ -488,13 +490,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val ds =  CollectionDataSets.getPojoContainingTupleAndWritable(env)
     
     val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup {
-      (values, out: Collector[Int]) => {
-        var c: Int = 0
-        for (v <- values) {
-          c += 1
+      (
+        values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable],
+        out: Collector[Int]) => {
+          var c: Int = 0
+          for (v <- values) {
+            c += 1
+          }
+          out.collect(c)
         }
-        out.collect(c)
-      }
     }
     
     val result: Seq[Int] = reduceDs.collect().sorted
@@ -511,9 +515,11 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val ds = CollectionDataSets.getTupleContainingPojos(env)
     
     val reduceDs =  ds.groupBy("_1", "_2.*").reduceGroup {
-      (values, out: Collector[Int]) => {
-        out.collect(values.size)
-      }
+      (
+        values: Iterator[(Int, CollectionDataSets.CrazyNested, CollectionDataSets.POJO)],
+        out: Collector[Int]) => {
+          out.collect(values.size)
+        }
     }
     
     val result: Seq[Int] = reduceDs.collect().sorted
@@ -643,20 +649,22 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
       .sortGroup("theTuple._1", Order.DESCENDING)
       .sortGroup("theTuple._2", Order.DESCENDING)
       .reduceGroup {
-        (values, out: Collector[String]) => {
-          var once: Boolean = false
-          val concat: StringBuilder = new StringBuilder
-          for (value <- values) {
-            if (!once) {
-              concat.append(value.hadoopFan.get)
-              concat.append("---")
-              once = true
+        (
+          values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable],
+          out: Collector[String]) => {
+            var once: Boolean = false
+            val concat: StringBuilder = new StringBuilder
+            for (value <- values) {
+              if (!once) {
+                concat.append(value.hadoopFan.get)
+                concat.append("---")
+                once = true
+              }
+              concat.append(value.theTuple)
+              concat.append("-")
             }
-            concat.append(value.theTuple)
-            concat.append("-")
+            out.collect(concat.toString())
           }
-          out.collect(concat.toString())
-        }
       }
     
     val result: Seq[String] = reduceDs.map(_.toString()).collect().sorted
@@ -803,13 +811,15 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val ds =  CollectionDataSets.getPojoWithMultiplePojos(env)
     
     val reduceDs =  ds.groupBy("p2.a2").reduceGroup {
-      (values, out: Collector[String]) => {
-        val concat: StringBuilder = new StringBuilder()
-        for (value <- values) {
-          concat.append(value.p2.a2)
+      (
+        values: Iterator[CollectionDataSets.PojoWithMultiplePojos],
+        out: Collector[String]) => {
+          val concat: StringBuilder = new StringBuilder()
+          for (value <- values) {
+            concat.append(value.p2.a2)
+          }
+          out.collect(concat.toString())
         }
-        out.collect(concat.toString())
-      }
     }
     
     val result : Seq[String] = reduceDs.map(_.toString()).collect().sorted
diff --git a/pom.xml b/pom.xml
index d5149e0925d..342fca22aff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,6 @@ under the License.
 		<module>flink-test-utils-parent</module>
 		<module>flink-state-backends</module>
 		<module>flink-libraries</module>
-		<module>flink-scala-shell</module>
 		<module>flink-quickstart</module>
 		<module>flink-contrib</module>
 		<module>flink-dist</module>
@@ -112,10 +111,11 @@ under the License.
 		<maven.compiler.source>${java.version}</maven.compiler.source>
 		<maven.compiler.target>${java.version}</maven.compiler.target>
 		<scala.macros.version>2.1.0</scala.macros.version>
-		<!-- Default scala versions, may be overwritten by build profiles -->
-		<scala.version>2.11.12</scala.version>
-		<scala.binary.version>2.11</scala.binary.version>
-		<chill.version>0.7.4</chill.version>
+		<!-- Default scala versions, must be overwritten by build profiles, so we set something
+		invalid here -->
+		<scala.version>INVALID</scala.version>
+		<scala.binary.version>INVALID</scala.binary.version>
+		<chill.version>0.7.6</chill.version>
 		<zookeeper.version>3.4.10</zookeeper.version>
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.7.9</jackson.version>
@@ -244,6 +244,12 @@ under the License.
 				<version>5.0.4-${flink.shaded.version}</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.apache.flink</groupId>
+				<artifactId>flink-shaded-asm-6</artifactId>
+				<version>6.2.1-${flink.shaded.version}</version>
+			</dependency>
+
 			<dependency>
 				<groupId>org.apache.flink</groupId>
 				<artifactId>flink-shaded-guava</artifactId>
@@ -471,10 +477,16 @@ under the License.
 				<version>${scala.version}</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.scala-lang.modules</groupId>
+				<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
+				<version>1.0.4</version>
+			</dependency>
+
 			<dependency>
 				<groupId>org.clapper</groupId>
 				<artifactId>grizzled-slf4j_${scala.binary.version}</artifactId>
-				<version>1.0.2</version>
+				<version>1.3.2</version>
 			</dependency>
 
 			<dependency>
@@ -537,7 +549,7 @@ under the License.
 			<dependency>
 				<groupId>org.scalatest</groupId>
 				<artifactId>scalatest_${scala.binary.version}</artifactId>
-				<version>2.2.2</version>
+				<version>3.0.0</version>
 				<scope>test</scope>
 			</dependency>
 
@@ -613,6 +625,106 @@ under the License.
 
 	<profiles>
 
+		<profile>
+			<id>scala-2.11</id>
+			<properties>
+				<scala.version>2.11.12</scala.version>
+				<scala.binary.version>2.11</scala.binary.version>
+			</properties>
+			<modules>
+				<module>flink-scala-shell</module>
+			</modules>
+			<build>
+				<plugins>
+					<!-- make sure we don't have any _2.10 or _2.12 dependencies when building
+					for Scala 2.11 -->
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-enforcer-plugin</artifactId>
+						<executions>
+							<execution>
+								<id>enforce-versions</id>
+								<goals>
+									<goal>enforce</goal>
+								</goals>
+								<configuration>
+									<rules>
+										<bannedDependencies>
+											<excludes combine.children="append">
+												<exclude>*:*_2.12</exclude>
+												<exclude>*:*_2.10</exclude>
+											</excludes>
+										</bannedDependencies>
+									</rules>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+
+
+		<profile>
+			<id>scala-2.12</id>
+			<properties>
+				<scala.version>2.12.7</scala.version>
+				<scala.binary.version>2.12</scala.binary.version>
+			</properties>
+			<build>
+				<plugins>
+					<!-- don't run tests that don't work for Scala 2.12, because not all of the
+					required test dependencies are available for Scala 2.12. The Kafka 0.9 connector
+					still works with Scala 2.12 because it only needs the scala-version-independent
+					kafka-clients dependency at runtime. -->
+					<plugin>
+						<groupId>org.codehaus.mojo</groupId>
+						<artifactId>build-helper-maven-plugin</artifactId>
+						<version>1.12</version>
+						<executions>
+							<execution>
+								<id>regex-property</id>
+								<goals>
+									<goal>regex-property</goal>
+								</goals>
+								<configuration>
+									<name>maven.test.skip</name>
+									<value>${project.artifactId}</value>
+									<regex>(flink-connector-kafka-0.9.*)|(flink-scala-shell.*)</regex>
+									<replacement>true</replacement>
+									<failIfNoMatch>false</failIfNoMatch>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+					<!-- make sure we don't have any _2.10 or _2.11 dependencies when building
+					for Scala 2.12 -->
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-enforcer-plugin</artifactId>
+						<executions>
+							<execution>
+								<id>enforce-versions</id>
+								<goals>
+									<goal>enforce</goal>
+								</goals>
+								<configuration>
+									<rules>
+										<bannedDependencies>
+											<excludes combine.children="append">
+												<exclude>*:*_2.11</exclude>
+												<exclude>*:*_2.10</exclude>
+											</excludes>
+										</bannedDependencies>
+									</rules>
+								</configuration>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+
 		<profile>
 			<id>java9</id>
 			<activation>
diff --git a/tools/check_dependency_convergence.sh b/tools/check_dependency_convergence.sh
index 5db8d9d95b1..a9294b841ed 100755
--- a/tools/check_dependency_convergence.sh
+++ b/tools/check_dependency_convergence.sh
@@ -43,6 +43,18 @@ modules=$(find . -maxdepth 3 -name 'pom.xml' -printf '%h\n' | sort -u | grep "fl
 
 for module in ${modules}
 do
+    # There are no Scala 2.12 dependencies for older Kafka versions
+    if [[ $PROFILE == *"scala-2.12"* ]]; then
+        if [[ $module == *"kafka-0.8"* ]]; then
+            echo "excluding ${module} because we build for Scala 2.12"
+            continue 2
+        fi
+        if [[ $module == *"kafka-0.9"* ]]; then
+            echo "excluding ${module} because we build for Scala 2.12"
+            continue 2
+        fi
+    fi
+
     # we are only interested in child modules
     for other_module in ${modules}
     do 
@@ -54,7 +66,7 @@ do
     
     cd "${module}"
     echo "checking ${module}"
-    output=$(mvn validate -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
+    output=$(mvn validate $PROFILE -nsu -Dcheckstyle.skip=true -Dcheck-convergence)
     exit_code=$?
     if [[ ${exit_code} != 0 ]]; then
         echo "dependency convergence failed."
diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh
index 4dbc6fab8c5..c5f0ad36ef2 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -69,7 +69,16 @@ make_binary_release() {
     dir_name="flink-$RELEASE_VERSION-bin-$NAME-scala_${SCALA_VERSION}"
   fi
 
+  if [ $SCALA_VERSION = "2.12" ]; then
+      FLAGS="$FLAGS -Pscala-2.12"
+  elif [ $SCALA_VERSION = "2.11" ]; then
+      FLAGS="$FLAGS -Pscala-2.11"
+  else
+      echo "Invalid Scala version ${SCALA_VERSION}"
+  fi
+
   # enable release profile here (to check for the maven version)
+  tools/change-scala-version.sh ${SCALA_VERSION}
   $MVN clean package $FLAGS -Prelease -pl flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip -Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
 
   cd flink-dist/target/flink-*-bin/
@@ -88,6 +97,11 @@ make_binary_release() {
 }
 
 if [ "$SCALA_VERSION" == "none" ] && [ "$HADOOP_VERSION" == "none" ]; then
+  make_binary_release "" "-DwithoutHadoop" "2.12"
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "2.12"
+  make_binary_release "hadoop26" "-Dhadoop.version=2.6.5" "2.12"
+  make_binary_release "hadoop27" "-Dhadoop.version=2.7.5" "2.12"
+  make_binary_release "hadoop28" "-Dhadoop.version=2.8.3" "2.12"
   make_binary_release "" "-DwithoutHadoop" "2.11"
   make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "2.11"
   make_binary_release "hadoop26" "-Dhadoop.version=2.6.5" "2.11"
diff --git a/tools/releasing/deploy_staging_jars.sh b/tools/releasing/deploy_staging_jars.sh
index 1bc20d06269..07fe97131e2 100755
--- a/tools/releasing/deploy_staging_jars.sh
+++ b/tools/releasing/deploy_staging_jars.sh
@@ -40,6 +40,11 @@ cd ..
 
 echo "Deploying to repository.apache.org"
 
+COMMON_OPTIONS="-Prelease,docs-and-source -DskipTests -DretryFailedDeploymentCount=10"
+
 echo "Deploying Scala 2.11 version"
-$MVN clean deploy -Prelease,docs-and-source -DskipTests -DretryFailedDeploymentCount=10
+$MVN clean deploy $COMMON_OPTIONS -Pscala-2.11
+
+echo "Deploying Scala 2.12 version"
+$MVN clean deploy $COMMON_OPTIONS -Pscala-2.12
 
diff --git a/tools/travis/stage.sh b/tools/travis/stage.sh
index 3b2252ed7dd..53f6beee5ef 100644
--- a/tools/travis/stage.sh
+++ b/tools/travis/stage.sh
@@ -35,7 +35,6 @@ flink-optimizer,\
 flink-runtime,\
 flink-runtime-web,\
 flink-scala,\
-flink-scala-shell,\
 flink-streaming-java,\
 flink-streaming-scala"
 
@@ -74,7 +73,6 @@ flink-connectors/flink-connector-elasticsearch5,\
 flink-connectors/flink-connector-elasticsearch6,\
 flink-connectors/flink-connector-elasticsearch-base,\
 flink-connectors/flink-connector-filesystem,\
-flink-connectors/flink-connector-kafka-0.8,\
 flink-connectors/flink-connector-kafka-0.9,\
 flink-connectors/flink-connector-kafka-0.10,\
 flink-connectors/flink-connector-kafka-0.11,\
@@ -90,6 +88,16 @@ if [[ ${PROFILE} == *"include-kinesis"* ]]; then
     MODULES_CONNECTORS="$MODULES_CONNECTORS,flink-connectors/flink-connector-kinesis"
 fi
 
+# we can only build the Kafka 0.8 connector when building for Scala 2.11
+if [[ $PROFILE == *"scala-2.11"* ]]; then
+    MODULES_CONNECTORS="$MODULES_CONNECTORS,flink-connectors/flink-connector-kafka-0.8"
+fi
+
+# we can only build the Scala Shell when building for Scala 2.11
+if [[ $PROFILE == *"scala-2.11"* ]]; then
+    MODULES_CORE="$MODULES_CORE,flink-scala-shell"
+fi
+
 function get_compile_modules_for_stage() {
     local stage=$1
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services