You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/22 19:45:00 UTC
flink git commit: [FLINK-2388] forward message to archivist in case
accumulators can't be found
Repository: flink
Updated Branches:
refs/heads/master 72b5dc980 -> aa6a7f04c
[FLINK-2388] forward message to archivist in case accumulators can't be found
- return AccumulatorsNotFound in case the archive cannot find them either
This closes #930.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa6a7f04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa6a7f04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa6a7f04
Branch: refs/heads/master
Commit: aa6a7f04c54a1c3675b1d78a3d0cc885a1362fa9
Parents: 72b5dc9
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jul 22 10:38:43 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Jul 22 16:32:48 2015 +0200
----------------------------------------------------------------------
.../common/accumulators/AccumulatorHelper.java | 9 ---
.../runtime/executiongraph/ExecutionGraph.java | 29 ++++++++
.../flink/runtime/jobmanager/JobManager.scala | 70 +++++---------------
.../runtime/jobmanager/MemoryArchivist.scala | 26 ++++++++
4 files changed, 73 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3e2e359..3907004 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -105,15 +105,6 @@ public class AccumulatorHelper {
return resultMap;
}
- public static String getAccumulatorsFormated(Map<?, Accumulator<?, ?>> newAccumulators) {
- StringBuilder builder = new StringBuilder();
- for (Map.Entry<?, Accumulator<?, ?>> entry : newAccumulators.entrySet()) {
- builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName()
- + ")" + ": " + entry.getValue().toString() + "\n");
- }
- return builder.toString();
- }
-
public static String getResultsFormated(Map<String, Object> map) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, Object> entry : map.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 677c809..9c977fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -580,6 +581,34 @@ public class ExecutionGraph implements Serializable {
return result;
}
+ /**
+ * Returns the a stringified version of the user-defined accumulators.
+ * @return an Array containing the StringifiedAccumulatorResult objects
+ */
+ public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
+
+ Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
+
+ int num = accumulatorMap.size();
+ StringifiedAccumulatorResult[] resultStrings = new StringifiedAccumulatorResult[num];
+
+ int i = 0;
+ for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
+
+ StringifiedAccumulatorResult result;
+ Accumulator<?, ?> value = entry.getValue();
+ if (value != null) {
+ result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+ } else {
+ result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+ }
+
+ resultStrings[i++] = result;
+ }
+
+ return resultStrings;
+ }
+
// --------------------------------------------------------------------------------------------
// Actions
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d8d51ce..c195a78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -32,7 +32,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, StringifiedAccumulatorResult}
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
@@ -684,65 +684,31 @@ class JobManager(
* @param message The accumulator message.
*/
private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
- message match {
-
- case RequestAccumulatorResults(jobID) =>
- try {
- val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = {
+ message match {
+ case RequestAccumulatorResults(jobID) =>
+ try {
currentJobs.get(jobID) match {
case Some((graph, jobInfo)) =>
- graph.getAccumulatorsSerialized
+ val accumulatorValues = graph.getAccumulatorsSerialized()
+ sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
case None =>
- null // TODO check also archive
+ archive.forward(message)
}
- }
-
- sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
- }
- catch {
+ } catch {
case e: Exception =>
- log.error("Cannot serialize accumulator result", e)
+ log.error("Cannot serialize accumulator result.", e)
sender() ! AccumulatorResultsErroneous(jobID, e)
- }
-
- case RequestAccumulatorResultsStringified(jobId) =>
- try {
- val accumulatorValues: Array[StringifiedAccumulatorResult] = {
- currentJobs.get(jobId) match {
- case Some((graph, jobInfo)) =>
- val accumulators = graph.aggregateUserAccumulators()
-
- val result: Array[StringifiedAccumulatorResult] = new
- Array[StringifiedAccumulatorResult](accumulators.size)
-
- var i = 0
- accumulators foreach {
- case (name, accumulator) =>
- val (typeString, valueString) =
- if (accumulator != null) {
- (accumulator.getClass.getSimpleName, accumulator.toString)
- } else {
- (null, null)
- }
- result(i) = new StringifiedAccumulatorResult(name, typeString, valueString)
- i += 1
- }
- result
- case None =>
- null // TODO check also archive
- }
}
- sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues)
- }
- catch {
- case e: Exception =>
- log.error("Cannot fetch accumulator result", e)
- sender() ! AccumulatorResultsErroneous(jobId, e)
- }
-
- case x => unhandled(x)
- }
+ case RequestAccumulatorResultsStringified(jobId) =>
+ currentJobs.get(jobId) match {
+ case Some((graph, jobInfo)) =>
+ val stringifiedAccumulators = graph.getAccumulatorResultsStringified()
+ sender() ! AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+ case None =>
+ archive.forward(message)
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 6d0b220..7572e72 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -24,6 +24,7 @@ import akka.actor.Actor
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.webmonitor._
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
@@ -120,6 +121,31 @@ class MemoryArchivist(private val max_entries: Int)
catch {
case t: Throwable => log.error("Exception while creating the jobs overview", t)
}
+
+
+ case RequestAccumulatorResults(jobID) =>
+ try {
+ graphs.get(jobID) match {
+ case Some(graph) =>
+ val accumulatorValues = graph.getAccumulatorsSerialized()
+ sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
+ case None =>
+ sender() ! AccumulatorResultsNotFound(jobID)
+ }
+ } catch {
+ case e: Exception =>
+ log.error("Cannot serialize accumulator result.", e)
+ sender() ! AccumulatorResultsErroneous(jobID, e)
+ }
+
+ case RequestAccumulatorResultsStringified(jobID) =>
+ graphs.get(jobID) match {
+ case Some(graph) =>
+ val accumulatorValues = graph.getAccumulatorResultsStringified()
+ sender() ! AccumulatorResultStringsFound(jobID, accumulatorValues)
+ case None =>
+ sender() ! AccumulatorResultsNotFound(jobID)
+ }
}
/**