You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:00 UTC
[64/82] [abbrv] incubator-flink git commit: Execute
lookupConnectionInformation and UpdateTaskExecutionState concurrently within
futures
Execute lookupConnectionInformation and UpdateTaskExecutionState concurrently within futures
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0516d266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0516d266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0516d266
Branch: refs/heads/master
Commit: 0516d266d21bcd9d17199880a0d5ebe6d9760fda
Parents: c93d9ea
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 16 12:44:17 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:32 2014 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 2 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 4 ++--
.../flink/runtime/jobmanager/JobManager.scala | 20 ++++++++++++--------
.../flink/test/util/AbstractTestBase.java | 1 +
4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index bbbe6d4..2d97504 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -580,7 +580,7 @@ public final class ConfigConstants {
// ------------------------------ Akka Values ------------------------------
- public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "10 s";
+ public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 229c229..97ce343 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -206,9 +206,9 @@ object AkkaUtils {
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
- | loglevel = "DEBUG"
+ | loglevel = "WARNING"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
- | stdout-loglevel = "DEBUG"
+ | stdout-loglevel = "WARNING"
| jvm-exit-on-fatal-error = off
| log-config-on-start = off
| serialize-messages = on
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8752bef..17f123a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -133,10 +133,12 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
// Create the user code class loader
libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
+ val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+
val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
(new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
- jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(),
- System.currentTimeMillis())))
+ jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys, userCodeLoader),
+ JobInfo(sender(), System.currentTimeMillis())))
val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
jobGraph.getNumberOfExecutionRetries
@@ -147,8 +149,6 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
- val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
-
if (userCodeLoader == null) {
throw new JobException("The user code class loader could not be initialized.")
}
@@ -253,7 +253,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
currentJobs.get(taskExecutionState.getJobID) match {
case Some((executionGraph, _)) =>
val originalSender = sender()
- originalSender ! executionGraph.updateState(taskExecutionState)
+ Future {
+ originalSender ! executionGraph.updateState(taskExecutionState)
+ }
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
.getJobID} to change state to ${taskExecutionState.getExecutionState}.")
sender() ! false
@@ -350,9 +352,11 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
val originalSender = sender()
- originalSender ! ConnectionInformation(
- executionGraph.lookupConnectionInfoAndDeployReceivers
- (connectionInformation, sourceChannelID))
+ Future {
+ originalSender ! ConnectionInformation(
+ executionGraph.lookupConnectionInfoAndDeployReceivers
+ (connectionInformation, sourceChannelID))
+ }
case None =>
log.error(s"Cannot find execution graph for job ID ${jobID}.")
sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0516d266/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b24b3ee..b4da64d 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
+import org.junit.Assert;
import scala.concurrent.duration.FiniteDuration;
public abstract class AbstractTestBase extends TestBaseUtils {