You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/25 19:56:09 UTC
[4/4] flink git commit: [hotfix] Fix testing setup of
StreamingOperatorsITCase via ScalaStreamingMultipleProgramsTestBase
[hotfix] Fix testing setup of StreamingOperatorsITCase via ScalaStreamingMultipleProgramsTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/082f892a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/082f892a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/082f892a
Branch: refs/heads/master
Commit: 082f892ad9d31ac8579058638d4c6e1f43b7e380
Parents: 1542260
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 25 20:16:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 25 20:16:59 2017 +0100
----------------------------------------------------------------------
.../ScalaStreamingMultipleProgramsTestBase.scala | 16 ++++++++++------
.../api/scala/StreamingOperatorsITCase.scala | 2 +-
2 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/082f892a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index 29b3a3e..d9f727c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -21,18 +21,20 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.test.util.TestBaseUtils
-import org.scalatest.BeforeAndAfterAll
+
+import org.junit.{After, Before}
+
import org.scalatest.junit.JUnitSuiteLike
trait ScalaStreamingMultipleProgramsTestBase
extends TestBaseUtils
- with JUnitSuiteLike
- with BeforeAndAfterAll {
+ with JUnitSuiteLike {
val parallelism = 4
var cluster: Option[LocalFlinkMiniCluster] = None
- override protected def beforeAll(): Unit = {
+ @Before
+ def beforeAll(): Unit = {
val cluster = Some(
TestBaseUtils.startCluster(
1,
@@ -43,10 +45,12 @@ trait ScalaStreamingMultipleProgramsTestBase
)
)
- val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
+ TestStreamEnvironment.setAsContext(cluster.get, parallelism)
}
- override protected def afterAll(): Unit = {
+ @After
+ def afterAll(): Unit = {
+ TestStreamEnvironment.unsetAsContext()
cluster.foreach {
TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/082f892a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index e08e0b5..4d690c8 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -69,7 +69,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
- env.getConfig.setMaxParallelism(2);
+ env.getConfig.setMaxParallelism(2)
val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {