You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/24 15:44:48 UTC

[1/2] flink git commit: [FLINK-3064] [core] Add size check in GroupReduceOperatorBase

Repository: flink
Updated Branches:
  refs/heads/master 54e642b31 -> d2b4391f4


[FLINK-3064] [core] Add size check in GroupReduceOperatorBase

This closes #1396.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2b4391f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2b4391f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2b4391f

Branch: refs/heads/master
Commit: d2b4391f4e42a96a8b3c852e303aba170de15727
Parents: 66b5def
Author: Martin Junghanns <m....@mailbox.org>
Authored: Mon Nov 23 20:07:43 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 24 15:44:20 2015 +0100

----------------------------------------------------------------------
 .../operators/base/GroupReduceOperatorBase.java | 38 ++++++++++----------
 1 file changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2b4391f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 3056fe7..fbb802d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -195,28 +195,30 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		
 		ArrayList<OUT> result = new ArrayList<OUT>();
 
-		if (keyColumns.length == 0) {
-			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
-			for (IN in: inputData) {
-				inputDataCopy.add(inputSerializer.copy(in));
-			}
-			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+		if (inputData.size() > 0) {
+			if (keyColumns.length == 0) {
+				final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+				List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
+				for (IN in : inputData) {
+					inputDataCopy.add(inputSerializer.copy(in));
+				}
+				CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
 
-			function.reduce(inputDataCopy, collector);
-		} else {
-			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
-			boolean[] keyOrderings = new boolean[keyColumns.length];
-			final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
+				function.reduce(inputDataCopy, collector);
+			} else {
+				final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+				boolean[] keyOrderings = new boolean[keyColumns.length];
+				final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
 
-			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
+				ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
 
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+				CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
 
-			while (keyedIterator.nextKey()) {
-				function.reduce(keyedIterator.getValues(), collector);
+				while (keyedIterator.nextKey()) {
+					function.reduce(keyedIterator.getValues(), collector);
+				}
 			}
 		}
 


[2/2] flink git commit: [streaming] [ml] Code cleanup: unused imports, fix javadocs

Posted by tr...@apache.org.
[streaming] [ml] Code cleanup: unused imports, fix javadocs

This closes #1391.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66b5def3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66b5def3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66b5def3

Branch: refs/heads/master
Commit: 66b5def320054776f09fb8a7f9b77be6f1572ae2
Parents: 54e642b
Author: smarthi <sm...@apache.org>
Authored: Sat Nov 21 17:14:43 2015 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 24 15:44:20 2015 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/flink/ml/MLUtils.scala  |  2 +-
 .../org/apache/flink/ml/classification/SVM.scala  | 18 +++++++-----------
 .../org/apache/flink/ml/math/SparseVector.scala   |  5 +----
 .../org/apache/flink/ml/math/VectorBuilder.scala  |  2 +-
 .../scala/org/apache/flink/ml/MLUtilsSuite.scala  |  2 +-
 .../flink/ml/classification/SVMITSuite.scala      |  2 +-
 .../apache/flink/ml/math/DenseMatrixSuite.scala   |  2 +-
 .../apache/flink/ml/math/SparseMatrixSuite.scala  |  4 ++--
 .../apache/flink/ml/math/SparseVectorSuite.scala  |  4 ++--
 .../flink/ml/recommendation/ALSITSuite.scala      |  9 ++++-----
 .../MultipleLinearRegressionITSuite.scala         | 16 +++++++---------
 .../streaming/api/scala/ConnectedStreams.scala    |  5 +++--
 .../flink/streaming/api/scala/DataStream.scala    |  3 +--
 .../api/scala/StreamExecutionEnvironment.scala    | 11 ++++-------
 .../scala/api/CsvOutputFormatITCase.java          |  2 +-
 .../scala/api/SocketOutputFormatITCase.java       |  3 +--
 .../streaming/api/scala/DataStreamTest.scala      |  7 +++----
 .../api/scala/OutputFormatTestPrograms.scala      |  3 ---
 .../scala/StreamingScalaAPICompletenessTest.scala |  2 +-
 19 files changed, 42 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
index a327ddc..804ab5f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
@@ -47,7 +47,7 @@ object MLUtils {
     * Since the libSVM/SVMLight format stores a vector in its sparse form, the [[LabeledVector]]
     * will also be instantiated with a [[SparseVector]].
     *
-    * @param env [[ExecutionEnvironment]]
+    * @param env executionEnvironment [[ExecutionEnvironment]]
     * @param filePath Path to the input file
     * @return [[DataSet]] of [[LabeledVector]] containing the information of the libSVM/SVMLight
     *        file

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index abf1696..4a780e9 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -18,23 +18,19 @@
 
 package org.apache.flink.ml.classification
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.ml.pipeline.{PredictOperation, FitOperation, PredictDataSetOperation,
-Predictor}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml._
 import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.{DenseVector, Vector}
 import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{DenseVector, Vector}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
 
-import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
+import breeze.linalg.{DenseVector => BreezeDenseVector, Vector => BreezeVector}
 
 /** Implements a soft-margin SVM using the communication-efficient distributed dual coordinate
   * ascent algorithm (CoCoA) with hinge-loss function.
@@ -538,7 +534,7 @@ object SVM{
     if(scala.math.abs(grad) != 0.0){
       val qii = x dot x
       val newAlpha = if(qii != 0.0){
-        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
+        scala.math.min(scala.math.max(alpha - (grad / qii), 0.0), 1.0)
       } else {
         1.0
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
index 8ad0369..5a78beb 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
@@ -25,10 +25,7 @@ import scala.util.Sorting
 /** Sparse vector implementation storing the data in two arrays. One index contains the sorted
   * indices of the non-zero vector entries and the other the corresponding vector entries
   */
-case class SparseVector(
-    val size: Int,
-    val indices: Array[Int],
-    val data: Array[Double])
+case class SparseVector(size: Int, indices: Array[Int], data: Array[Double])
   extends Vector
   with Serializable {
   /** Updates the element at the given index with the provided value

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
index 79c7005..3bbf146 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
@@ -44,7 +44,7 @@ object VectorBuilder{
   implicit val sparseVectorBuilder = new VectorBuilder[SparseVector] {
     override def build(data: List[Double]): SparseVector = {
       // Enrich elements with explicit indices and filter out zero entries
-      SparseVector.fromCOO(data.length, (0 until data.length).zip(data).filter(_._2 != 0.0))
+      SparseVector.fromCOO(data.length, data.indices.zip(data).filter(_._2 != 0.0))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
index 1464d07..d896937 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
@@ -86,7 +86,7 @@ class MLUtilsSuite extends FlatSpec with Matchers with FlinkTestBase {
 
     val labeledVectorsDS = env.fromCollection(labeledVectors)
 
-    val tempDir = new File(System.getProperty("java.io.tmpdir"));
+    val tempDir = new File(System.getProperty("java.io.tmpdir"))
 
     val tempFile = new File(tempDir, TestFileUtils.randomFileName())
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
index 57a7783..e6eb873 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.ml.classification
 
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import org.apache.flink.ml.math.DenseVector
 
 import org.apache.flink.api.scala._
 import org.apache.flink.test.util.FlinkTestBase

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
index ca3d601..88bde3b 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
@@ -81,6 +81,6 @@ class DenseMatrixSuite extends FlatSpec with Matchers {
 
     copy(0, 0) = 1
 
-    denseMatrix should not equal(copy)
+    denseMatrix should not equal copy
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
index 74f2ccf..970ea4b 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
@@ -66,7 +66,7 @@ class SparseMatrixSuite extends FlatSpec with Matchers {
       groupBy{_._1}.
       mapValues{
       entries =>
-        entries.map(_._2).reduce(_ + _)
+        entries.map(_._2).sum
     }
 
     for(row <- 0 until numRows; col <- 0 until numCols) {
@@ -129,6 +129,6 @@ class SparseMatrixSuite extends FlatSpec with Matchers {
 
     copy(2, 3) = 2
 
-    sparseMatrix should not equal(copy)
+    sparseMatrix should not equal copy
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
index 5ed0b37..5e7e4d7 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
@@ -56,7 +56,7 @@ class SparseVectorSuite extends FlatSpec with Matchers {
       groupBy{_._1}.
       mapValues{
       entries =>
-        entries.map(_._2).reduce(_ + _)
+        entries.map(_._2).sum
     }
 
     for(index <- 0 until size) {
@@ -95,7 +95,7 @@ class SparseVectorSuite extends FlatSpec with Matchers {
 
     copy(3) = 3
 
-    sparseVector should not equal(copy)
+    sparseVector should not equal copy
   }
 
   it should "calculate dot product with SparseVector" in {

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 2ad310d..9c241fd 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -18,11 +18,10 @@
 
 package org.apache.flink.ml.recommendation
 
-import scala.language.postfixOps
-
 import org.scalatest._
 
-import org.apache.flink.api.scala.ExecutionEnvironment
+import scala.language.postfixOps
+
 import org.apache.flink.api.scala._
 import org.apache.flink.test.util.FlinkTestBase
 
@@ -64,13 +63,13 @@ class ALSITSuite
 
     predictions foreach {
       case (uID, iID, value) => {
-        resultMap.isDefinedAt(((uID, iID))) should be(true)
+        resultMap.isDefinedAt((uID, iID)) should be(true)
 
         value should be(resultMap((uID, iID)) +- 0.1)
       }
     }
 
-    val risk = als.empiricalRisk(inputDS).collect().apply(0)
+    val risk = als.empiricalRisk(inputDS).collect().head
 
     risk should be(expectedEmpiricalRisk +- 1)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index 4e78ba5..17b8a85 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -18,13 +18,11 @@
 
 package org.apache.flink.ml.regression
 
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.ml.common.{WeightVector, ParameterMap}
-import org.apache.flink.ml.preprocessing.PolynomialFeatures
-import org.scalatest.{Matchers, FlatSpec}
-
 import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{ParameterMap, WeightVector}
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
 import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
 
 class MultipleLinearRegressionITSuite
   extends FlatSpec
@@ -55,7 +53,7 @@ class MultipleLinearRegressionITSuite
 
     weightList.size should equal(1)
 
-    val WeightVector(weights, intercept) = weightList(0)
+    val WeightVector(weights, intercept) = weightList.head
 
     expectedWeights.toIterator zip weights.valueIterator foreach {
       case (expectedWeight, weight) =>
@@ -63,7 +61,7 @@ class MultipleLinearRegressionITSuite
     }
     intercept should be (expectedWeight0 +- 0.4)
 
-    val srs = mlr.squaredResidualSum(inputDS).collect().apply(0)
+    val srs = mlr.squaredResidualSum(inputDS).collect().head
 
     srs should be (expectedSquaredResidualSum +- 2)
   }
@@ -91,7 +89,7 @@ class MultipleLinearRegressionITSuite
 
     weightList.size should equal(1)
 
-    val WeightVector(weights, intercept) = weightList(0)
+    val WeightVector(weights, intercept) = weightList.head
 
     RegressionData.expectedPolynomialWeights.toIterator.zip(weights.valueIterator) foreach {
       case (expectedWeight, weight) =>
@@ -102,7 +100,7 @@ class MultipleLinearRegressionITSuite
 
     val transformedInput = polynomialBase.transform(inputDS, parameters)
 
-    val srs = mlr.squaredResidualSum(transformedInput).collect().apply(0)
+    val srs = mlr.squaredResidualSum(transformedInput).collect().head
 
     srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 3ff773f..6181db4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
 import org.apache.flink.util.Collector
+
 import scala.reflect.ClassTag
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 6855e00..6a3118a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f632240..14f23e1 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,20 +18,17 @@
 
 package org.apache.flink.streaming.api.scala
 
-import java.util.Objects
-import java.util.Objects._
-
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateBackend
-import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.SplittableIterator
 
@@ -196,11 +193,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * functions (implementing the interface 
    * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
    *
-   * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
+   * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
    * maintains the state in heap memory, as objects. It is lightweight without extra 
    * dependencies, but can checkpoint only small states (some counters).
    *
-   * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
+   * <p>In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
    * stores checkpoints of the state (also maintained as heap objects) in files. When using
    * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
    * that state is not lost upon failures of individual nodes and that the entire streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
index 0c60719..a389e51 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -57,7 +57,7 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
+					out.collect(new Tuple2<>(token, 1));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
index a2a78b7..7b3ed67 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.scala.api;
 
 import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
 import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
 import org.apache.flink.test.testdata.WordCountData;
 import org.junit.Ignore;
 
 @Ignore
-//This test sometimes failes most likely due to the behaviour
+//This test sometimes fails most likely due to the behaviour
 //of the socket. Disabled for now.
 public class SocketOutputFormatITCase extends SocketOutputTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 988e7ec..f9d54a9 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import java.lang
+
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
@@ -26,13 +27,11 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
-
 import org.junit.Assert.fail
 import org.junit.Test
 
@@ -501,7 +500,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
     val sg = env.getStreamGraph
 
-    assert(sg.getIterationSourceSinkPairs().size() == 2)
+    assert(sg.getIterationSourceSinkPairs.size() == 2)
   }
 
   /////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index e09f164..2168bf7 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -18,9 +18,6 @@
 package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
-import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import scala.language.existentials
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 101f3b5..d584f07 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -97,7 +97,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       """^.*hashCode"""
     ).map(_.r)
     lazy val excludedByPattern =
-      excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
+      excludedPatterns.map(_.findFirstIn(name)).exists(_.isDefined)
     name.contains("$") || excludedNames.contains(name) || excludedByPattern
   }