You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/18 14:11:56 UTC

flink git commit: [FLINK-7212][tests] re-enable JobManagerLeaderSessionIDITCase

Repository: flink
Updated Branches:
  refs/heads/master 67fb2f3ab -> 673a883ed


[FLINK-7212][tests] re-enable JobManagerLeaderSessionIDITCase

This test was previously named JobManagerLeaderSessionIDITSuite and has not
been executed for a while by maven because of it having the wrong naming
scheme. After a renaming, it runs again but needed a minor change to be
successful again which is included in this commit.

This closes #4354.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/673a883e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/673a883e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/673a883e

Branch: refs/heads/master
Commit: 673a883ed89098127c7b7a67216d4399885a7fce
Parents: 67fb2f3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 17 14:46:17 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 18 16:09:33 2017 +0200

----------------------------------------------------------------------
 .../JobManagerLeaderSessionIDITCase.scala       | 106 ++++++++++++++++++
 .../JobManagerLeaderSessionIDITSuite.scala      | 110 -------------------
 2 files changed, 106 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/673a883e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITCase.scala
new file mode 100644
index 0000000..b57c0da
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITCase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime.jobmanager
+
+import java.util.UUID
+
+import akka.actor.ActorSystem
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, WaitForAllVerticesToBeRunning}
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class JobManagerLeaderSessionIDITCase(_system: ActorSystem)
+  extends TestKit(_system)
+  with ImplicitSender
+  with FunSuiteLike
+  with Matchers
+  with BeforeAndAfterAll
+  with ScalaTestingUtils {
+
+  val numTaskManagers = 2
+  val taskManagerNumSlots = 2
+  val numSlots = numTaskManagers * taskManagerNumSlots
+
+  val cluster = TestingUtils.startTestingCluster(
+    taskManagerNumSlots,
+    numTaskManagers,
+    TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
+
+  def this() = this(ActorSystem("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig))
+
+  override def afterAll(): Unit = {
+    cluster.stop()
+    TestKit.shutdownActorSystem(system)
+  }
+
+  test("A JobManager should not process CancelJob messages with the wrong leader session ID") {
+    val sender = new JobVertex("BlockingSender");
+    sender.setParallelism(numSlots)
+    sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
+    val jobGraph = new JobGraph("TestJob", sender)
+
+    val oldSessionID = UUID.randomUUID()
+
+    val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
+    val jm = jmGateway.actor()
+
+    within(TestingUtils.TESTING_DURATION) {
+      jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+
+      expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+
+      jmGateway.tell(WaitForAllVerticesToBeRunning(jobGraph.getJobID), self)
+
+      expectMsg(AllVerticesRunning(jobGraph.getJobID))
+
+      jm ! LeaderSessionMessage(oldSessionID, CancelJob(jobGraph.getJobID))
+
+      BlockingUntilSignalNoOpInvokable.triggerExecution()
+
+      expectMsgType[JobResultSuccess]
+    }
+  }
+}
+
+class BlockingUntilSignalNoOpInvokable extends AbstractInvokable {
+
+  override def invoke(): Unit = {
+    BlockingUntilSignalNoOpInvokable.lock.synchronized{
+      BlockingUntilSignalNoOpInvokable.lock.wait()
+    }
+  }
+}
+
+object BlockingUntilSignalNoOpInvokable {
+  val lock = new Object
+
+  def triggerExecution(): Unit = {
+    lock.synchronized{
+      lock.notifyAll()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/673a883e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
deleted file mode 100644
index 78bc0ee..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerLeaderSessionIDITSuite.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.runtime.jobmanager
-
-import java.util.UUID
-
-import akka.actor.ActorSystem
-import akka.actor.Status.Success
-import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.messages.JobManagerMessages.{LeaderSessionMessage, CancelJob,
-JobResultSuccess, SubmitJob}
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.junit.runner.RunWith
-import org.scalatest.{FunSuiteLike, Matchers, BeforeAndAfterAll}
-import org.scalatest.junit.JUnitRunner
-
-@RunWith(classOf[JUnitRunner])
-class JobManagerLeaderSessionIDITSuite(_system: ActorSystem)
-  extends TestKit(_system)
-  with ImplicitSender
-  with FunSuiteLike
-  with Matchers
-  with BeforeAndAfterAll
-  with ScalaTestingUtils {
-
-  val numTaskManagers = 2
-  val taskManagerNumSlots = 2
-  val numSlots = numTaskManagers * taskManagerNumSlots
-
-  val cluster = TestingUtils.startTestingCluster(
-    taskManagerNumSlots,
-    numTaskManagers,
-    TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
-
-  def this() = this(ActorSystem("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig))
-
-  override def afterAll(): Unit = {
-    cluster.stop()
-    TestKit.shutdownActorSystem(system)
-  }
-
-  test("A JobManager should not process CancelJob messages with the wrong leader session ID") {
-    val sender = new JobVertex("BlockingSender");
-    sender.setParallelism(numSlots)
-    sender.setInvokableClass(classOf[BlockingUntilSignalNoOpInvokable])
-    val jobGraph = new JobGraph("TestJob", sender)
-
-    val oldSessionID = UUID.randomUUID()
-
-    val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-    val jm = jmGateway.actor()
-
-    within(TestingUtils.TESTING_DURATION) {
-      jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
-
-      expectMsg(Success(jobGraph.getJobID))
-
-      jmGateway.tell(WaitForAllVerticesToBeRunning(jobGraph.getJobID), self)
-
-      expectMsg(AllVerticesRunning(jobGraph.getJobID))
-
-      jm ! LeaderSessionMessage(oldSessionID, CancelJob(jobGraph.getJobID))
-
-      BlockingUntilSignalNoOpInvokable.triggerExecution()
-
-      expectMsgType[JobResultSuccess]
-    }
-  }
-}
-
-class BlockingUntilSignalNoOpInvokable extends AbstractInvokable {
-
-  override def invoke(): Unit = {
-    BlockingUntilSignalNoOpInvokable.lock.synchronized{
-      BlockingUntilSignalNoOpInvokable.lock.wait()
-    }
-  }
-}
-
-object BlockingUntilSignalNoOpInvokable {
-  val lock = new Object
-
-  def triggerExecution(): Unit = {
-    lock.synchronized{
-      lock.notifyAll()
-    }
-  }
-}