You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/19 17:33:40 UTC
flink git commit: [tests] Replaces Scala mixins for the
Job/TaskManager with classes which extend the respective classes. Adds proper
logger registration for sub-classes of FlinkUntypedActor.
Repository: flink
Updated Branches:
refs/heads/master 8602b7af0 -> baaa3daca
[tests] Replaces Scala mixins for the Job/TaskManager with classes which extend the respective classes. Adds proper logger registration for sub-classes of FlinkUntypedActor.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baaa3dac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baaa3dac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baaa3dac
Branch: refs/heads/master
Commit: baaa3dacabdfa87c3c11882810e05dc384f24038
Parents: 8602b7a
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 19 16:16:31 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 19 16:16:31 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/akka/FlinkUntypedActor.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 -
.../runtime/testingUtils/TestingCluster.scala | 9 ++--
.../testingUtils/TestingJobManager.scala | 49 +++++++++++++++++---
.../testingUtils/TestingMemoryArchivist.scala | 12 ++---
.../test/util/ForkableFlinkMiniCluster.scala | 14 ++----
6 files changed, 57 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index bba2aeb..1456758 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -39,7 +39,7 @@ import java.util.UUID;
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {
- protected static Logger LOG = LoggerFactory.getLogger(FlinkUntypedActor.class);
+ protected Logger LOG = LoggerFactory.getLogger(getClass());
/**
* This method is called by Akka if a new message has arrived for the actor. It logs the
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5c0f468..92688fa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,6 @@ import java.util.{UUID, Collections}
import akka.actor.Status.{Failure, Success}
import akka.actor._
-import _root_.akka.pattern.ask
import grizzled.slf4j.Logger
@@ -69,7 +68,6 @@ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
/**
* The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index f5a506d..057ffeb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorRef, Props, ActorSystem}
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
@@ -80,11 +80,11 @@ class TestingCluster(userConfiguration: Configuration,
timeout,
archiveCount) = JobManager.createJobManagerComponents(configuration)
- val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
+ val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(
- new JobManager(
+ new TestingJobManager(
configuration,
executionContext,
instanceManager,
@@ -94,8 +94,7 @@ class TestingCluster(userConfiguration: Configuration,
executionRetries,
delayBetweenRetries,
timeout,
- streamingMode)
- with TestingJobManager)
+ streamingMode))
val dispatcherJobManagerProps = if (synchronousDispatcher) {
// disable asynchronous futures (e.g. accumulator update in Heartbeat)
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 8a7297b..987af40 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,10 +21,14 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{Cancellable, Terminated, ActorRef}
import akka.pattern.{ask, pipe}
import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.{StreamingMode, FlinkActor}
import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.Messages.Disconnect
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -32,15 +36,46 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
-/** Mixin for [[TestingJobManager]] to support testing messages
- */
-trait TestingJobManager extends FlinkActor {
- that: JobManager =>
+/** JobManager implementation extended by testing messages
+ *
+ * @param flinkConfiguration
+ * @param executionContext
+ * @param instanceManager
+ * @param scheduler
+ * @param libraryCacheManager
+ * @param archive
+ * @param defaultExecutionRetries
+ * @param delayBetweenRetries
+ * @param timeout
+ * @param mode
+ */
+class TestingJobManager(
+ flinkConfiguration: Configuration,
+ executionContext: ExecutionContext,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ defaultExecutionRetries: Int,
+ delayBetweenRetries: Long,
+ timeout: FiniteDuration,
+ mode: StreamingMode)
+ extends JobManager(
+ flinkConfiguration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ defaultExecutionRetries,
+ delayBetweenRetries,
+ timeout,
+ mode) {
import scala.collection.JavaConverters._
import context._
@@ -60,7 +95,7 @@ trait TestingJobManager extends FlinkActor {
var disconnectDisabled = false
- abstract override def handleMessage: Receive = {
+ override def handleMessage: Receive = {
handleTestingMessage orElse super.handleMessage
}
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index f105ccc..2ccddfa 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.{FlinkActor}
import org.apache.flink.runtime.jobmanager.MemoryArchivist
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
-/**
- * Mixin for the [[MemoryArchivist]] to support testing messages
- */
-trait TestingMemoryArchivist extends FlinkActor {
- self: MemoryArchivist =>
+/** Memory archivist extended by testing messages
+ *
+ * @param maxEntries number of maximum number of archived jobs
+ */
+class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) {
- abstract override def handleMessage: Receive = {
+ override def handleMessage: Receive = {
handleTestingMessage orElse super.handleMessage
}
http://git-wip-us.apache.org/repos/asf/flink/blob/baaa3dac/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index e83c7a6..faf8424 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,10 +22,7 @@ import akka.actor.{Props, ActorRef, ActorSystem}
import akka.pattern.Patterns._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.AkkaActorGateway
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManager,
@@ -95,14 +92,12 @@ class ForkableFlinkMiniCluster(
archiveCount) = JobManager.createJobManagerComponents(configuration)
val testArchiveProps = Props(
- new MemoryArchivist(
- archiveCount)
- with TestingMemoryArchivist)
+ new TestingMemoryArchivist(archiveCount))
val archiver = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME)
val jobManagerProps = Props(
- new JobManager(
+ new TestingJobManager(
configuration,
executionContext,
instanceManager,
@@ -112,8 +107,7 @@ class ForkableFlinkMiniCluster(
executionRetries,
delayBetweenRetries,
timeout,
- streamingMode)
- with TestingJobManager)
+ streamingMode))
val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)