You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/25 19:56:09 UTC

[4/4] flink git commit: [hotfix] Fix testing setup of StreamingOperatorsITCase via ScalaStreamingMultipleProgramsTestBase

[hotfix] Fix testing setup of StreamingOperatorsITCase via ScalaStreamingMultipleProgramsTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/082f892a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/082f892a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/082f892a

Branch: refs/heads/master
Commit: 082f892ad9d31ac8579058638d4c6e1f43b7e380
Parents: 1542260
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 25 20:16:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 25 20:16:59 2017 +0100

----------------------------------------------------------------------
 .../ScalaStreamingMultipleProgramsTestBase.scala    | 16 ++++++++++------
 .../api/scala/StreamingOperatorsITCase.scala        |  2 +-
 2 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/082f892a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index 29b3a3e..d9f727c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -21,18 +21,20 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.streaming.util.TestStreamEnvironment
 import org.apache.flink.test.util.TestBaseUtils
-import org.scalatest.BeforeAndAfterAll
+
+import org.junit.{After, Before}
+
 import org.scalatest.junit.JUnitSuiteLike
 
 trait ScalaStreamingMultipleProgramsTestBase
   extends TestBaseUtils
-  with  JUnitSuiteLike
-  with BeforeAndAfterAll {
+  with  JUnitSuiteLike {
 
   val parallelism = 4
   var cluster: Option[LocalFlinkMiniCluster] = None
 
-  override protected def beforeAll(): Unit = {
+  @Before
+  def beforeAll(): Unit = {
     val cluster = Some(
       TestBaseUtils.startCluster(
         1,
@@ -43,10 +45,12 @@ trait ScalaStreamingMultipleProgramsTestBase
       )
     )
 
-    val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
+    TestStreamEnvironment.setAsContext(cluster.get, parallelism)
   }
 
-  override protected def afterAll(): Unit = {
+  @After
+  def afterAll(): Unit = {
+    TestStreamEnvironment.unsetAsContext()
     cluster.foreach {
       TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/082f892a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index e08e0b5..4d690c8 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -69,7 +69,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.setParallelism(2)
-    env.getConfig.setMaxParallelism(2);
+    env.getConfig.setMaxParallelism(2)
 
     val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {