You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:00 UTC

[64/82] [abbrv] incubator-flink git commit: Execute lookupConnectionInformation and UpdateTaskExecutionState concurrently within futures

Execute lookupConnectionInformation and UpdateTaskExecutionState concurrently within futures


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0516d266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0516d266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0516d266

Branch: refs/heads/master
Commit: 0516d266d21bcd9d17199880a0d5ebe6d9760fda
Parents: c93d9ea
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 16 12:44:17 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:32 2014 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  4 ++--
 .../flink/runtime/jobmanager/JobManager.scala   | 20 ++++++++++++--------
 .../flink/test/util/AbstractTestBase.java       |  1 +
 4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index bbbe6d4..2d97504 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -580,7 +580,7 @@ public final class ConfigConstants {
 	
 	// ------------------------------ Akka Values ------------------------------
 
-	public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "10 s";
+	public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
 
 	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 229c229..97ce343 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -206,9 +206,9 @@ object AkkaUtils {
       |
       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
       |  logger-startup-timeout = 30s
-      |  loglevel = "DEBUG"
+      |  loglevel = "WARNING"
       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-      |  stdout-loglevel = "DEBUG"
+      |  stdout-loglevel = "WARNING"
       |  jvm-exit-on-fatal-error = off
       |  log-config-on-start = off
       |  serialize-messages = on

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8752bef..17f123a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -133,10 +133,12 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
           // Create the user code class loader
           libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
 
+          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+
           val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
             (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(),
-              System.currentTimeMillis())))
+              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, userCodeLoader),
+              JobInfo(sender(), System.currentTimeMillis())))
 
           val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
             jobGraph.getNumberOfExecutionRetries
@@ -147,8 +149,6 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
           executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
           executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
 
-          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
-
           if (userCodeLoader == null) {
             throw new JobException("The user code class loader could not be initialized.")
           }
@@ -253,7 +253,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
             val originalSender = sender()
-            originalSender ! executionGraph.updateState(taskExecutionState)
+            Future {
+              originalSender ! executionGraph.updateState(taskExecutionState)
+            }
           case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
             .getJobID} to change state to ${taskExecutionState.getExecutionState}.")
             sender() ! false
@@ -350,9 +352,11 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
           val originalSender = sender()
-          originalSender ! ConnectionInformation(
-            executionGraph.lookupConnectionInfoAndDeployReceivers
-              (connectionInformation, sourceChannelID))
+          Future {
+            originalSender ! ConnectionInformation(
+              executionGraph.lookupConnectionInfoAndDeployReceivers
+                (connectionInformation, sourceChannelID))
+          }
         case None =>
           log.error(s"Cannot find execution graph for job ID ${jobID}.")
           sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b24b3ee..b4da64d 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import org.junit.Assert;
 import scala.concurrent.duration.FiniteDuration;
 
 public abstract class AbstractTestBase extends TestBaseUtils {