You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/19 17:33:40 UTC

flink git commit: [tests] Replaces Scala mixins for the Job/TaskManager with classes which extend the respective classes. Adds proper logger registration for sub-classes of FlinkUntypedActor.

Repository: flink
Updated Branches:
  refs/heads/master 8602b7af0 -> baaa3daca


[tests] Replaces Scala mixins for the Job/TaskManager with classes which extend the respective classes. Adds proper logger registration for sub-classes of FlinkUntypedActor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baaa3dac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baaa3dac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baaa3dac

Branch: refs/heads/master
Commit: baaa3dacabdfa87c3c11882810e05dc384f24038
Parents: 8602b7a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 19 16:16:31 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 19 16:16:31 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/akka/FlinkUntypedActor.java   |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  2 -
 .../runtime/testingUtils/TestingCluster.scala   |  9 ++--
 .../testingUtils/TestingJobManager.scala        | 49 +++++++++++++++++---
 .../testingUtils/TestingMemoryArchivist.scala   | 12 ++---
 .../test/util/ForkableFlinkMiniCluster.scala    | 14 ++----
 6 files changed, 57 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index bba2aeb..1456758 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -39,7 +39,7 @@ import java.util.UUID;
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
-	protected static Logger LOG = LoggerFactory.getLogger(FlinkUntypedActor.class);
+	protected Logger LOG = LoggerFactory.getLogger(getClass());
 
 	/**
 	 * This method is called by Akka if a new message has arrived for the actor. It logs the

http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5c0f468..92688fa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,6 @@ import java.util.{UUID, Collections}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
-import _root_.akka.pattern.ask
 
 import grizzled.slf4j.Logger
 
@@ -69,7 +68,6 @@ import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the

http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index f5a506d..057ffeb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -80,11 +80,11 @@ class TestingCluster(userConfiguration: Configuration,
       timeout,
       archiveCount) = JobManager.createJobManagerComponents(configuration)
     
-    val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
+    val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
     val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
     
     val jobManagerProps = Props(
-      new JobManager(
+      new TestingJobManager(
         configuration,
         executionContext,
         instanceManager,
@@ -94,8 +94,7 @@ class TestingCluster(userConfiguration: Configuration,
         executionRetries,
         delayBetweenRetries,
         timeout,
-        streamingMode)
-      with TestingJobManager)
+        streamingMode))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 8a7297b..987af40 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,10 +21,14 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.{Cancellable, Terminated, ActorRef}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.{StreamingMode, FlinkActor}
 import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -32,15 +36,46 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 
 import scala.language.postfixOps
 
-/** Mixin for [[TestingJobManager]] to support testing messages
- */
-trait TestingJobManager extends FlinkActor {
-  that: JobManager =>
+/** JobManager implementation extended by testing messages
+  *
+  * @param flinkConfiguration
+  * @param executionContext
+  * @param instanceManager
+  * @param scheduler
+  * @param libraryCacheManager
+  * @param archive
+  * @param defaultExecutionRetries
+  * @param delayBetweenRetries
+  * @param timeout
+  * @param mode
+  */
+class TestingJobManager(
+    flinkConfiguration: Configuration,
+    executionContext: ExecutionContext,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    defaultExecutionRetries: Int,
+    delayBetweenRetries: Long,
+    timeout: FiniteDuration,
+    mode: StreamingMode)
+  extends JobManager(
+    flinkConfiguration,
+    executionContext,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    defaultExecutionRetries,
+    delayBetweenRetries,
+    timeout,
+    mode) {
 
   import scala.collection.JavaConverters._
   import context._
@@ -60,7 +95,7 @@ trait TestingJobManager extends FlinkActor {
 
   var disconnectDisabled = false
 
-  abstract override def handleMessage: Receive = {
+  override def handleMessage: Receive = {
     handleTestingMessage orElse super.handleMessage
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index f105ccc..2ccddfa 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.{FlinkActor}
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
 
-/**
- * Mixin for the [[MemoryArchivist]] to support testing messages
- */
-trait TestingMemoryArchivist extends FlinkActor {
-  self: MemoryArchivist =>
+/** Memory archivist extended by testing messages
+  *
+  * @param maxEntries number of maximum number of archived jobs
+  */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
 
-  abstract override def handleMessage: Receive = {
+  override def handleMessage: Receive = {
     handleTestingMessage orElse super.handleMessage
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index e83c7a6..faf8424 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,10 +22,7 @@ import akka.actor.{Props, ActorRef, ActorSystem}
 import akka.pattern.Patterns._
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.AkkaActorGateway
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager,
@@ -95,14 +92,12 @@ class ForkableFlinkMiniCluster(
       archiveCount) = JobManager.createJobManagerComponents(configuration)
 
     val testArchiveProps = Props(
-      new MemoryArchivist(
-        archiveCount)
-      with TestingMemoryArchivist)
+      new TestingMemoryArchivist(archiveCount))
 
     val archiver = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
     
     val jobManagerProps = Props(
-      new JobManager(
+      new TestingJobManager(
         configuration,
         executionContext,
         instanceManager,
@@ -112,8 +107,7 @@ class ForkableFlinkMiniCluster(
         executionRetries,
         delayBetweenRetries,
         timeout,
-        streamingMode)
-      with TestingJobManager)
+        streamingMode))
 
     val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)