You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/04/30 10:33:16 UTC
flink git commit: [FLINK-1843][jobmanager] remove SoftReferences on
archived ExecutionGraphs
Repository: flink
Updated Branches:
refs/heads/master 8e429ced2 -> 84790ef62
[FLINK-1843][jobmanager] remove SoftReferences on archived ExecutionGraphs
The previously introduced SoftReferences to store archived
ExecutionGraphs cleared old graphs in a non-transparent order.
This closes #639.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84790ef6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84790ef6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84790ef6
Branch: refs/heads/master
Commit: 84790ef627651c884ebab4ee4269d33d6990a3ca
Parents: 8e429ce
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Apr 29 12:34:31 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Apr 30 10:29:44 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 4 +--
.../runtime/jobmanager/MemoryArchivist.scala | 33 ++++++--------------
.../runtime/messages/ArchiveMessages.scala | 8 +++++
.../testingUtils/TestingMemoryArchivist.scala | 2 +-
4 files changed, 20 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 83f9e35..15a9446 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -292,7 +292,7 @@ class JobManager(val flinkConfiguration: Configuration,
if (newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
- // is the client waiting for the job result?
+ // is the client waiting for the job result?
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
@@ -326,7 +326,7 @@ class JobManager(val flinkConfiguration: Configuration,
}
case None =>
removeJob(jobID)
- }
+ }
case msg: BarrierAck =>
currentJobs.get(msg.jobID) match {
http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 9e71ebb..62ea435 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
import scala.collection.mutable
-import scala.ref.SoftReference
/**
* Actor which stores terminated Flink jobs. The number of stored Flink jobs is set by max_entries.
@@ -56,28 +55,31 @@ class MemoryArchivist(private val max_entries: Int)
* Map of execution graphs belonging to recently started jobs with the time stamp of the last
* received job event. The insert order is preserved through a LinkedHashMap.
*/
- val graphs = mutable.LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
+ val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
override def receiveWithLogMessages: Receive = {
/* Receive Execution Graph to archive */
case ArchiveExecutionGraph(jobID, graph) =>
// wrap graph inside a soft reference
- graphs.update(jobID, new SoftReference(graph))
-
+ graphs.update(jobID, graph)
trimHistory()
+ case RequestArchivedJob(jobID: JobID) =>
+ val graph = graphs.get(jobID)
+ sender ! ArchivedJob(graph)
+
case RequestArchivedJobs =>
- sender ! ArchivedJobs(getAllGraphs)
+ sender ! ArchivedJobs(graphs.values)
case RequestJob(jobID) =>
- getGraph(jobID) match {
+ graphs.get(jobID) match {
case Some(graph) => sender ! JobFound(jobID, graph)
case None => sender ! JobNotFound(jobID)
}
case RequestJobStatus(jobID) =>
- getGraph(jobID) match {
+ graphs.get(jobID) match {
case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
case None => sender ! JobNotFound(jobID)
}
@@ -92,23 +94,6 @@ class MemoryArchivist(private val max_entries: Int)
}
/**
- * Gets all graphs that have not been garbage collected.
- * @return An iterable with all valid ExecutionGraphs
- */
- protected def getAllGraphs: Iterable[ExecutionGraph] = graphs.values.flatMap(_.get)
-
- /**
- * Gets a graph with a jobID if it has not been garbage collected.
- * @param jobID
- * @return ExecutionGraph or null
- */
- protected def getGraph(jobID: JobID): Option[ExecutionGraph] = graphs.get(jobID) match {
- case Some(softRef) => softRef.get
- case None => None
- }
-
-
- /**
* Remove old ExecutionGraphs belonging to a jobID
* * if more than max_entries are in the queue.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index b4ed2cc..e9e7dec 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -34,6 +34,14 @@ object ArchiveMessages {
case object RequestArchivedJobs
/**
+ * Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]]
+ * @param jobID
+ */
+ case class RequestArchivedJob(jobID: JobID)
+
+ case class ArchivedJob(job: Option[ExecutionGraph])
+
+ /**
* Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
* @param jobs
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/84790ef6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 88d3cd0..b8d1217 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -34,7 +34,7 @@ trait TestingMemoryArchivist extends ActorLogMessages {
def receiveTestingMessages: Receive = {
case RequestExecutionGraph(jobID) =>
- val executionGraph = getGraph(jobID)
+ val executionGraph = graphs.get(jobID)
executionGraph match {
case Some(graph) => sender ! ExecutionGraphFound(jobID, graph)