You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/24 15:44:48 UTC
[1/2] flink git commit: [FLINK-3064] [core] Add size check in
GroupReduceOperatorBase
Repository: flink
Updated Branches:
refs/heads/master 54e642b31 -> d2b4391f4
[FLINK-3064] [core] Add size check in GroupReduceOperatorBase
This closes #1396.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2b4391f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2b4391f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2b4391f
Branch: refs/heads/master
Commit: d2b4391f4e42a96a8b3c852e303aba170de15727
Parents: 66b5def
Author: Martin Junghanns <m....@mailbox.org>
Authored: Mon Nov 23 20:07:43 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 24 15:44:20 2015 +0100
----------------------------------------------------------------------
.../operators/base/GroupReduceOperatorBase.java | 38 ++++++++++----------
1 file changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d2b4391f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 3056fe7..fbb802d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -195,28 +195,30 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
ArrayList<OUT> result = new ArrayList<OUT>();
- if (keyColumns.length == 0) {
- final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
- List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
- for (IN in: inputData) {
- inputDataCopy.add(inputSerializer.copy(in));
- }
- CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+ if (inputData.size() > 0) {
+ if (keyColumns.length == 0) {
+ final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+ List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
+ for (IN in : inputData) {
+ inputDataCopy.add(inputSerializer.copy(in));
+ }
+ CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
- function.reduce(inputDataCopy, collector);
- } else {
- final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
- boolean[] keyOrderings = new boolean[keyColumns.length];
- final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
+ function.reduce(inputDataCopy, collector);
+ } else {
+ final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+ boolean[] keyOrderings = new boolean[keyColumns.length];
+ final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
- ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
+ ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
- CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+ CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
- while (keyedIterator.nextKey()) {
- function.reduce(keyedIterator.getValues(), collector);
+ while (keyedIterator.nextKey()) {
+ function.reduce(keyedIterator.getValues(), collector);
+ }
}
}
[2/2] flink git commit: [streaming] [ml] Code cleanup: unused imports,
fix javadocs
Posted by tr...@apache.org.
[streaming] [ml] Code cleanup: unused imports, fix javadocs
This closes #1391.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66b5def3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66b5def3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66b5def3
Branch: refs/heads/master
Commit: 66b5def320054776f09fb8a7f9b77be6f1572ae2
Parents: 54e642b
Author: smarthi <sm...@apache.org>
Authored: Sat Nov 21 17:14:43 2015 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 24 15:44:20 2015 +0100
----------------------------------------------------------------------
.../main/scala/org/apache/flink/ml/MLUtils.scala | 2 +-
.../org/apache/flink/ml/classification/SVM.scala | 18 +++++++-----------
.../org/apache/flink/ml/math/SparseVector.scala | 5 +----
.../org/apache/flink/ml/math/VectorBuilder.scala | 2 +-
.../scala/org/apache/flink/ml/MLUtilsSuite.scala | 2 +-
.../flink/ml/classification/SVMITSuite.scala | 2 +-
.../apache/flink/ml/math/DenseMatrixSuite.scala | 2 +-
.../apache/flink/ml/math/SparseMatrixSuite.scala | 4 ++--
.../apache/flink/ml/math/SparseVectorSuite.scala | 4 ++--
.../flink/ml/recommendation/ALSITSuite.scala | 9 ++++-----
.../MultipleLinearRegressionITSuite.scala | 16 +++++++---------
.../streaming/api/scala/ConnectedStreams.scala | 5 +++--
.../flink/streaming/api/scala/DataStream.scala | 3 +--
.../api/scala/StreamExecutionEnvironment.scala | 11 ++++-------
.../scala/api/CsvOutputFormatITCase.java | 2 +-
.../scala/api/SocketOutputFormatITCase.java | 3 +--
.../streaming/api/scala/DataStreamTest.scala | 7 +++----
.../api/scala/OutputFormatTestPrograms.scala | 3 ---
.../scala/StreamingScalaAPICompletenessTest.scala | 2 +-
19 files changed, 42 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
index a327ddc..804ab5f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
@@ -47,7 +47,7 @@ object MLUtils {
* Since the libSVM/SVMLight format stores a vector in its sparse form, the [[LabeledVector]]
* will also be instantiated with a [[SparseVector]].
*
- * @param env [[ExecutionEnvironment]]
+ * @param env executionEnvironment [[ExecutionEnvironment]]
* @param filePath Path to the input file
* @return [[DataSet]] of [[LabeledVector]] containing the information of the libSVM/SVMLight
* file
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index abf1696..4a780e9 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -18,23 +18,19 @@
package org.apache.flink.ml.classification
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.ml.pipeline.{PredictOperation, FitOperation, PredictDataSetOperation,
-Predictor}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml._
import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.{DenseVector, Vector}
import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{DenseVector, Vector}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
-import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
+import breeze.linalg.{DenseVector => BreezeDenseVector, Vector => BreezeVector}
/** Implements a soft-margin SVM using the communication-efficient distributed dual coordinate
* ascent algorithm (CoCoA) with hinge-loss function.
@@ -538,7 +534,7 @@ object SVM{
if(scala.math.abs(grad) != 0.0){
val qii = x dot x
val newAlpha = if(qii != 0.0){
- scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
+ scala.math.min(scala.math.max(alpha - (grad / qii), 0.0), 1.0)
} else {
1.0
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
index 8ad0369..5a78beb 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
@@ -25,10 +25,7 @@ import scala.util.Sorting
/** Sparse vector implementation storing the data in two arrays. One index contains the sorted
* indices of the non-zero vector entries and the other the corresponding vector entries
*/
-case class SparseVector(
- val size: Int,
- val indices: Array[Int],
- val data: Array[Double])
+case class SparseVector(size: Int, indices: Array[Int], data: Array[Double])
extends Vector
with Serializable {
/** Updates the element at the given index with the provided value
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
index 79c7005..3bbf146 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
@@ -44,7 +44,7 @@ object VectorBuilder{
implicit val sparseVectorBuilder = new VectorBuilder[SparseVector] {
override def build(data: List[Double]): SparseVector = {
// Enrich elements with explicit indices and filter out zero entries
- SparseVector.fromCOO(data.length, (0 until data.length).zip(data).filter(_._2 != 0.0))
+ SparseVector.fromCOO(data.length, data.indices.zip(data).filter(_._2 != 0.0))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
index 1464d07..d896937 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
@@ -86,7 +86,7 @@ class MLUtilsSuite extends FlatSpec with Matchers with FlinkTestBase {
val labeledVectorsDS = env.fromCollection(labeledVectors)
- val tempDir = new File(System.getProperty("java.io.tmpdir"));
+ val tempDir = new File(System.getProperty("java.io.tmpdir"))
val tempFile = new File(tempDir, TestFileUtils.randomFileName())
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
index 57a7783..e6eb873 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
@@ -19,7 +19,7 @@
package org.apache.flink.ml.classification
import org.scalatest.{FlatSpec, Matchers}
-import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import org.apache.flink.ml.math.DenseVector
import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
index ca3d601..88bde3b 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseMatrixSuite.scala
@@ -81,6 +81,6 @@ class DenseMatrixSuite extends FlatSpec with Matchers {
copy(0, 0) = 1
- denseMatrix should not equal(copy)
+ denseMatrix should not equal copy
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
index 74f2ccf..970ea4b 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
@@ -66,7 +66,7 @@ class SparseMatrixSuite extends FlatSpec with Matchers {
groupBy{_._1}.
mapValues{
entries =>
- entries.map(_._2).reduce(_ + _)
+ entries.map(_._2).sum
}
for(row <- 0 until numRows; col <- 0 until numCols) {
@@ -129,6 +129,6 @@ class SparseMatrixSuite extends FlatSpec with Matchers {
copy(2, 3) = 2
- sparseMatrix should not equal(copy)
+ sparseMatrix should not equal copy
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
index 5ed0b37..5e7e4d7 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
@@ -56,7 +56,7 @@ class SparseVectorSuite extends FlatSpec with Matchers {
groupBy{_._1}.
mapValues{
entries =>
- entries.map(_._2).reduce(_ + _)
+ entries.map(_._2).sum
}
for(index <- 0 until size) {
@@ -95,7 +95,7 @@ class SparseVectorSuite extends FlatSpec with Matchers {
copy(3) = 3
- sparseVector should not equal(copy)
+ sparseVector should not equal copy
}
it should "calculate dot product with SparseVector" in {
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 2ad310d..9c241fd 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -18,11 +18,10 @@
package org.apache.flink.ml.recommendation
-import scala.language.postfixOps
-
import org.scalatest._
-import org.apache.flink.api.scala.ExecutionEnvironment
+import scala.language.postfixOps
+
import org.apache.flink.api.scala._
import org.apache.flink.test.util.FlinkTestBase
@@ -64,13 +63,13 @@ class ALSITSuite
predictions foreach {
case (uID, iID, value) => {
- resultMap.isDefinedAt(((uID, iID))) should be(true)
+ resultMap.isDefinedAt((uID, iID)) should be(true)
value should be(resultMap((uID, iID)) +- 0.1)
}
}
- val risk = als.empiricalRisk(inputDS).collect().apply(0)
+ val risk = als.empiricalRisk(inputDS).collect().head
risk should be(expectedEmpiricalRisk +- 1)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index 4e78ba5..17b8a85 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -18,13 +18,11 @@
package org.apache.flink.ml.regression
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.ml.common.{WeightVector, ParameterMap}
-import org.apache.flink.ml.preprocessing.PolynomialFeatures
-import org.scalatest.{Matchers, FlatSpec}
-
import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{ParameterMap, WeightVector}
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
class MultipleLinearRegressionITSuite
extends FlatSpec
@@ -55,7 +53,7 @@ class MultipleLinearRegressionITSuite
weightList.size should equal(1)
- val WeightVector(weights, intercept) = weightList(0)
+ val WeightVector(weights, intercept) = weightList.head
expectedWeights.toIterator zip weights.valueIterator foreach {
case (expectedWeight, weight) =>
@@ -63,7 +61,7 @@ class MultipleLinearRegressionITSuite
}
intercept should be (expectedWeight0 +- 0.4)
- val srs = mlr.squaredResidualSum(inputDS).collect().apply(0)
+ val srs = mlr.squaredResidualSum(inputDS).collect().head
srs should be (expectedSquaredResidualSum +- 2)
}
@@ -91,7 +89,7 @@ class MultipleLinearRegressionITSuite
weightList.size should equal(1)
- val WeightVector(weights, intercept) = weightList(0)
+ val WeightVector(weights, intercept) = weightList.head
RegressionData.expectedPolynomialWeights.toIterator.zip(weights.valueIterator) foreach {
case (expectedWeight, weight) =>
@@ -102,7 +100,7 @@ class MultipleLinearRegressionITSuite
val transformedInput = polynomialBase.transform(inputDS, parameters)
- val srs = mlr.squaredResidualSum(transformedInput).collect().apply(0)
+ val srs = mlr.squaredResidualSum(transformedInput).collect().head
srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 3ff773f..6181db4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
import org.apache.flink.util.Collector
+
import scala.reflect.ClassTag
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 6855e00..6a3118a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, RichFilterFunction, RichFlatMapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.AbstractTime
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f632240..14f23e1 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,20 +18,17 @@
package org.apache.flink.streaming.api.scala
-import java.util.Objects
-import java.util.Objects._
-
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.runtime.state.StateBackend
-import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.types.StringValue
import org.apache.flink.util.SplittableIterator
@@ -196,11 +193,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* functions (implementing the interface
* [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
*
- * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
+ * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
* maintains the state in heap memory, as objects. It is lightweight without extra
* dependencies, but can checkpoint only small states (some counters).
*
- * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
+ * <p>In contrast, the [[org.apache.flink.runtime.state.filesystem.FsStateBackend]]
* stores checkpoints of the state (also maintained as heap objects) in files. When using
* a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
* that state is not lost upon failures of individual nodes and that the entire streaming
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
index 0c60719..a389e51 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -57,7 +57,7 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
+ out.collect(new Tuple2<>(token, 1));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
index a2a78b7..7b3ed67 100644
--- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.scala.api;
import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
import org.apache.flink.test.testdata.WordCountData;
import org.junit.Ignore;
@Ignore
-//This test sometimes failes most likely due to the behaviour
+//This test sometimes fails most likely due to the behaviour
//of the socket. Disabled for now.
public class SocketOutputFormatITCase extends SocketOutputTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 988e7ec..f9d54a9 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.scala
import java.lang
+
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
@@ -26,13 +27,11 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.runtime.partitioner._
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
-
import org.junit.Assert.fail
import org.junit.Test
@@ -501,7 +500,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val sg = env.getStreamGraph
- assert(sg.getIterationSourceSinkPairs().size() == 2)
+ assert(sg.getIterationSourceSinkPairs.size() == 2)
}
/////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index e09f164..2168bf7 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -18,9 +18,6 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
-import org.apache.flink.test.util.MultipleProgramsTestBase
import scala.language.existentials
http://git-wip-us.apache.org/repos/asf/flink/blob/66b5def3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 101f3b5..d584f07 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -97,7 +97,7 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"""^.*hashCode"""
).map(_.r)
lazy val excludedByPattern =
- excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
+ excludedPatterns.map(_.findFirstIn(name)).exists(_.isDefined)
name.contains("$") || excludedNames.contains(name) || excludedByPattern
}