You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:47 UTC
[51/82] [abbrv] incubator-flink git commit: Add debug guardians to
suppress string generation which caused a significant performance loss.
Add debug guardians to suppress string generation which caused a significant performance loss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd9a1ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd9a1ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd9a1ba4
Branch: refs/heads/master
Commit: dd9a1ba43579f18bf536442ae3f5854c64a3791f
Parents: f726e55
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 10 10:50:45 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 2 +-
.../apache/flink/runtime/ActorLogMessages.scala | 8 ++++--
.../apache/flink/runtime/akka/AkkaUtils.scala | 6 ++++
.../flink/runtime/jobmanager/JobManager.scala | 30 +++++++++++++-------
.../flink/runtime/taskmanager/TaskManager.scala | 14 +++++++--
.../runtime/testingUtils/TestingCluster.scala | 8 ------
.../flink/test/util/AbstractTestBase.java | 1 +
.../test/util/ForkableFlinkMiniCluster.scala | 4 ---
.../src/test/resources/log4j-test.properties | 2 +-
pom.xml | 12 ++++----
10 files changed, 51 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 15b76f7..c615821 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -586,7 +586,7 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
- public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 min";
+ public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m";
public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index b39c11d..892e68d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -30,11 +30,15 @@ trait ActorLogMessages {
override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
override def apply(x: Any):Unit = {
- log.debug("Received message {} from {}.", x, self.sender)
+ if(log.isDebugEnabled) {
+ log.debug(s"Received message ${x} from ${self.sender}.")
+ }
val start = System.nanoTime()
_receiveWithLogMessages(x)
val duration = (System.nanoTime() - start) / 1000000
- log.debug(s"Handled message {} in {} ms from {}.", x, duration, self.sender)
+ if(log.isDebugEnabled) {
+ log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.")
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 0c5405e..b23f620 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -113,7 +113,13 @@ object AkkaUtils {
|
| actor{
| default-dispatcher{
+ | executor = "default-executor"
+ |
| throughput = ${akkaThroughput}
+ |
+ | fork-join-executor {
+ | parallelism-factor = 2.0
+ | }
| }
| }
|
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index eb0d79c..6dbcb67 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -92,7 +92,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
instanceManager.addInstanceListener(scheduler)
- log.info(s"Started job manager. Waiting for incoming messages.")
+ log.info("Started job manager. Waiting for incoming messages.")
override def postStop(): Unit = {
log.info(s"Stopping job manager ${self.path}.")
@@ -128,7 +128,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
" null."))
} else {
- log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
+ log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
// Create the user code class loader
libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
@@ -153,8 +153,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
throw new JobException("The user code class loader could not be initialized.")
}
- log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
- .getName}).")
+ if(log.isDebugEnabled) {
+ log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
+ .getName}}).")
+ }
for (vertex <- jobGraph.getVertices) {
val executableClass = vertex.getInvokableClassName
@@ -169,13 +171,17 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
// topological sorting of the job vertices
val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
- log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
- .getJobID} (${jobGraph.getName}).")
+ if(log.isDebugEnabled) {
+ log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
+ .getJobID} (${jobGraph.getName}).")
+ }
executionGraph.attachJobGraph(sortedTopology)
- log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
- s"(${jobGraph.getName}).")
+ if(log.isDebugEnabled) {
+ log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
+ s"(${jobGraph.getName}).")
+ }
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
@@ -261,7 +267,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
val execution = executionGraph.getRegisteredExecutions().get(executionAttempt)
if(execution == null){
- log.error("Can not find Execution for attempt " + executionAttempt)
+ log.error(s"Can not find Execution for attempt ${executionAttempt}.")
null
}else{
val slot = execution.getAssignedResource
@@ -289,7 +295,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
null
}
- log.debug("Send next input split {}.", nextInputSplit)
+ if(log.isDebugEnabled) {
+ log.debug(s"Send next input split ${nextInputSplit}.")
+ }
sender() ! NextInputSplit(nextInputSplit)
}
@@ -297,7 +305,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " +
- s"${newJobStatus}${optionalMessage}.")
+ s"${newJobStatus}${if(optionalMessage == null) "" else optionalMessage}.")
if(newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index b1cfda7..80af0e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -311,7 +311,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
libraryCacheManager.unregisterTask(jobID, executionID)
} catch {
case ioe: IOException =>
- log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
+ if(log.isDebugEnabled) {
+ log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
+ }
}
}
@@ -326,11 +328,17 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
case LogMemoryUsage => {
memoryMXBean foreach {
- mxbean => log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean))
+ mxbean =>
+ if(log.isDebugEnabled) {
+ log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean))
+ }
}
gcMXBeans foreach {
- mxbeans => log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans))
+ mxbeans =>
+ if(log.isDebugEnabled) {
+ log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 3c11e1f..5a51265 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -37,14 +37,6 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
cfg
}
- override def getJobManagerAkkaConfigString(): String = {
- super.getJobManagerAkkaConfigString() + TestingUtils.getTestingSerializationBindings
- }
-
- override def getTaskManagerAkkaConfigString(index: Int): String = {
- super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
- }
-
override def startJobManager(implicit system: ActorSystem) = {
system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
JobManager.JOB_MANAGER_NAME)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c30d976..af92dbb 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -102,6 +102,7 @@ public abstract class AbstractTestBase {
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+ config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, 1000000);
this.executor = new ForkableFlinkMiniCluster(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index cf85533..8761ec6 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -52,10 +52,6 @@ LocalFlinkMiniCluster(userConfiguration) {
super.generateConfiguration(config)
}
- override def getTaskManagerAkkaConfigString(index: Int): String = {
- super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
- }
-
override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
val config = configuration.clone()
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 2c2d4ff..0b686e5 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
################################################################################
# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=DEBUG, A1
+log4j.rootLogger=OFF, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6739948..3cbfaa7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@ under the License.
<slf4j.version>1.7.7</slf4j.version>
<guava.version>17.0</guava.version>
<scala.version>2.10.4</scala.version>
- <akka.version>2.3.6</akka.version>
+ <akka.version>2.3.7</akka.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.macros.version>2.0.1</scala.macros.version>
</properties>
@@ -644,11 +644,11 @@ under the License.
<exclude>**/resources/**/font-awesome/**</exclude>
<exclude>**/resources/**/jquery*</exclude>
<exclude>**/resources/**/bootstrap*</exclude>
- <exclude>flink-clients/resources/web-docs/js/*d3.js</exclude>
- <exclude>flink-runtime/resources/web-docs-infoserver/css/sb-admin.css</exclude>
- <exclude>flink-runtime/resources/web-docs-infoserver/js/flot/*</exclude>
- <exclude>flink-runtime/resources/web-docs-infoserver/js/jcanvas.min.js</exclude>
- <exclude>flink-runtime/resources/web-docs-infoserver/js/timeline.js</exclude>
+ <exclude>flink-clients/src/main/resources/web-docs/js/*d3.js</exclude>
+ <exclude>flink-runtime/src/main/resources/web-docs-infoserver/css/sb-admin.css</exclude>
+ <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/flot/*</exclude>
+ <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/jcanvas.min.js</exclude>
+ <exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude>
<!-- Test Data. -->
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
<exclude>flink-addons/flink-avro/src/test/resources/avro/user.avsc</exclude>