You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/22 19:45:00 UTC

flink git commit: [FLINK-2388] forward message to archivist in case accumulators can't be found

Repository: flink
Updated Branches:
  refs/heads/master 72b5dc980 -> aa6a7f04c


[FLINK-2388] forward message to archivist in case accumulators can't be found

- return AccumulatorsNotFound in case the archive cannot find them either

This closes #930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa6a7f04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa6a7f04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa6a7f04

Branch: refs/heads/master
Commit: aa6a7f04c54a1c3675b1d78a3d0cc885a1362fa9
Parents: 72b5dc9
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jul 22 10:38:43 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Jul 22 16:32:48 2015 +0200

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  |  9 ---
 .../runtime/executiongraph/ExecutionGraph.java  | 29 ++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 70 +++++---------------
 .../runtime/jobmanager/MemoryArchivist.scala    | 26 ++++++++
 4 files changed, 73 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3e2e359..3907004 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -105,15 +105,6 @@ public class AccumulatorHelper {
 		return resultMap;
 	}
 
-	public static String getAccumulatorsFormated(Map<?, Accumulator<?, ?>> newAccumulators) {
-		StringBuilder builder = new StringBuilder();
-		for (Map.Entry<?, Accumulator<?, ?>> entry : newAccumulators.entrySet()) {
-			builder.append("- " + entry.getKey() + " (" + entry.getValue().getClass().getName()
-					+ ")" + ": " + entry.getValue().toString() + "\n");
-		}
-		return builder.toString();
-	}
-
 	public static String getResultsFormated(Map<String, Object> map) {
 		StringBuilder builder = new StringBuilder();
 		for (Map.Entry<String, Object> entry : map.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 677c809..9c977fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -580,6 +581,34 @@ public class ExecutionGraph implements Serializable {
 		return result;
 	}
 
+	/**
+	 * Returns the a stringified version of the user-defined accumulators.
+	 * @return an Array containing the StringifiedAccumulatorResult objects
+	 */
+	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
+
+		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
+
+		int num = accumulatorMap.size();
+		StringifiedAccumulatorResult[] resultStrings = new StringifiedAccumulatorResult[num];
+
+		int i = 0;
+		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
+
+			StringifiedAccumulatorResult result;
+			Accumulator<?, ?> value = entry.getValue();
+			if (value != null) {
+				result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString());
+			} else {
+				result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
+			}
+
+			resultStrings[i++] = result;
+		}
+
+		return resultStrings;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d8d51ce..c195a78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -32,7 +32,7 @@ import grizzled.slf4j.Logger
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, StringifiedAccumulatorResult}
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
@@ -684,65 +684,31 @@ class JobManager(
    * @param message The accumulator message.
    */
   private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
-    message match {
-
-      case RequestAccumulatorResults(jobID) =>
-        try {
-          val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = {
+      message match {
+        case RequestAccumulatorResults(jobID) =>
+          try {
             currentJobs.get(jobID) match {
               case Some((graph, jobInfo)) =>
-                graph.getAccumulatorsSerialized
+                val accumulatorValues = graph.getAccumulatorsSerialized()
+                sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
               case None =>
-                null // TODO check also archive
+                archive.forward(message)
             }
-          }
-
-          sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
-        }
-        catch {
+          } catch {
           case e: Exception =>
-            log.error("Cannot serialize accumulator result", e)
+            log.error("Cannot serialize accumulator result.", e)
             sender() ! AccumulatorResultsErroneous(jobID, e)
-        }
-
-      case RequestAccumulatorResultsStringified(jobId) =>
-        try {
-          val accumulatorValues: Array[StringifiedAccumulatorResult] = {
-            currentJobs.get(jobId) match {
-              case Some((graph, jobInfo)) =>
-                val accumulators = graph.aggregateUserAccumulators()
-
-                val result: Array[StringifiedAccumulatorResult] = new
-                    Array[StringifiedAccumulatorResult](accumulators.size)
-
-                var i = 0
-                accumulators foreach {
-                  case (name, accumulator) =>
-                    val (typeString, valueString) =
-                      if (accumulator != null) {
-                        (accumulator.getClass.getSimpleName, accumulator.toString)
-                      } else {
-                        (null, null)
-                      }
-                    result(i) = new StringifiedAccumulatorResult(name, typeString, valueString)
-                    i += 1
-                }
-                result
-              case None =>
-                null // TODO check also archive
-            }
           }
 
-          sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues)
-        }
-        catch {
-          case e: Exception =>
-            log.error("Cannot fetch accumulator result", e)
-            sender() ! AccumulatorResultsErroneous(jobId, e)
-        }
-
-      case x => unhandled(x)
-    }
+        case RequestAccumulatorResultsStringified(jobId) =>
+          currentJobs.get(jobId) match {
+            case Some((graph, jobInfo)) =>
+              val stringifiedAccumulators = graph.getAccumulatorResultsStringified()
+              sender() ! AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+            case None =>
+              archive.forward(message)
+          }
+      }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/aa6a7f04/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 6d0b220..7572e72 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -24,6 +24,7 @@ import akka.actor.Actor
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
@@ -120,6 +121,31 @@ class MemoryArchivist(private val max_entries: Int)
       catch {
         case t: Throwable => log.error("Exception while creating the jobs overview", t)
       }
+
+
+    case RequestAccumulatorResults(jobID) =>
+      try {
+        graphs.get(jobID) match {
+          case Some(graph) =>
+            val accumulatorValues = graph.getAccumulatorsSerialized()
+            sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
+          case None =>
+            sender() ! AccumulatorResultsNotFound(jobID)
+        }
+      } catch {
+        case e: Exception =>
+          log.error("Cannot serialize accumulator result.", e)
+          sender() ! AccumulatorResultsErroneous(jobID, e)
+      }
+
+      case RequestAccumulatorResultsStringified(jobID) =>
+        graphs.get(jobID) match {
+          case Some(graph) =>
+            val accumulatorValues = graph.getAccumulatorResultsStringified()
+            sender() ! AccumulatorResultStringsFound(jobID, accumulatorValues)
+          case None =>
+            sender() ! AccumulatorResultsNotFound(jobID)
+        }
   }
 
   /**