You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:47 UTC

[51/82] [abbrv] incubator-flink git commit: Add debug guardians to suppress string generation which caused a significant performance loss.

Add debug guardians to suppress string generation which caused a significant performance loss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd9a1ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd9a1ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd9a1ba4

Branch: refs/heads/master
Commit: dd9a1ba43579f18bf536442ae3f5854c64a3791f
Parents: f726e55
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 10 10:50:45 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  2 +-
 .../apache/flink/runtime/ActorLogMessages.scala |  8 ++++--
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  6 ++++
 .../flink/runtime/jobmanager/JobManager.scala   | 30 +++++++++++++-------
 .../flink/runtime/taskmanager/TaskManager.scala | 14 +++++++--
 .../runtime/testingUtils/TestingCluster.scala   |  8 ------
 .../flink/test/util/AbstractTestBase.java       |  1 +
 .../test/util/ForkableFlinkMiniCluster.scala    |  4 ---
 .../src/test/resources/log4j-test.properties    |  2 +-
 pom.xml                                         | 12 ++++----
 10 files changed, 51 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 15b76f7..c615821 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -586,7 +586,7 @@ public final class ConfigConstants {
 
 	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 min";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m";
 
 	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index b39c11d..892e68d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -30,11 +30,15 @@ trait ActorLogMessages {
     override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
 
     override def apply(x: Any):Unit = {
-      log.debug("Received message {} from {}.", x, self.sender)
+      if(log.isDebugEnabled) {
+        log.debug(s"Received message ${x} from ${self.sender}.")
+      }
       val start = System.nanoTime()
       _receiveWithLogMessages(x)
       val duration = (System.nanoTime() - start) / 1000000
-      log.debug(s"Handled message {} in {} ms from {}.", x, duration, self.sender)
+      if(log.isDebugEnabled) {
+        log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 0c5405e..b23f620 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -113,7 +113,13 @@ object AkkaUtils {
          |
          |  actor{
          |    default-dispatcher{
+         |      executor = "default-executor"
+         |
          |      throughput = ${akkaThroughput}
+         |
+         |      fork-join-executor {
+         |        parallelism-factor = 2.0
+         |      }
          |    }
          |  }
          |

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index eb0d79c..6dbcb67 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -92,7 +92,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
 
   instanceManager.addInstanceListener(scheduler)
 
-  log.info(s"Started job manager. Waiting for incoming messages.")
+  log.info("Started job manager. Waiting for incoming messages.")
 
   override def postStop(): Unit = {
     log.info(s"Stopping job manager ${self.path}.")
@@ -128,7 +128,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
             " null."))
         } else {
 
-          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
+          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
 
           // Create the user code class loader
           libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
@@ -153,8 +153,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
             throw new JobException("The user code class loader could not be initialized.")
           }
 
-          log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
-            .getName}).")
+          if(log.isDebugEnabled) {
+            log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
+              .getName}}).")
+          }
 
           for (vertex <- jobGraph.getVertices) {
             val executableClass = vertex.getInvokableClassName
@@ -169,13 +171,17 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
           // topological sorting of the job vertices
           val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
 
-          log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
-            .getJobID} (${jobGraph.getName}).")
+          if(log.isDebugEnabled) {
+            log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
+              .getJobID} (${jobGraph.getName}).")
+          }
 
           executionGraph.attachJobGraph(sortedTopology)
 
-          log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
-            s"(${jobGraph.getName}).")
+          if(log.isDebugEnabled) {
+            log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
+              s"(${jobGraph.getName}).")
+          }
 
           executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
 
@@ -261,7 +267,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
           val execution = executionGraph.getRegisteredExecutions().get(executionAttempt)
 
           if(execution == null){
-            log.error("Can not find Execution for attempt " + executionAttempt)
+            log.error(s"Can not find Execution for attempt ${executionAttempt}.")
             null
           }else{
             val slot = execution.getAssignedResource
@@ -289,7 +295,9 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
           null
       }
 
-      log.debug("Send next input split {}.", nextInputSplit)
+      if(log.isDebugEnabled) {
+        log.debug(s"Send next input split ${nextInputSplit}.")
+      }
       sender() ! NextInputSplit(nextInputSplit)
     }
 
@@ -297,7 +305,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
           log.info(s"Status of job ${jobID} (${executionGraph.getJobName}) changed to " +
-            s"${newJobStatus}${optionalMessage}.")
+            s"${newJobStatus}${if(optionalMessage == null) "" else optionalMessage}.")
 
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index b1cfda7..80af0e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -311,7 +311,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
               libraryCacheManager.unregisterTask(jobID, executionID)
             } catch {
               case ioe: IOException =>
-                log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
+                if(log.isDebugEnabled) {
+                  log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
+                }
             }
           }
 
@@ -326,11 +328,17 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
     case LogMemoryUsage => {
       memoryMXBean foreach {
-        mxbean => log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean))
+        mxbean =>
+            if(log.isDebugEnabled) {
+              log.debug(TaskManager.getMemoryUsageStatsAsString(mxbean))
+            }
       }
 
       gcMXBeans foreach {
-        mxbeans => log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans))
+        mxbeans =>
+            if(log.isDebugEnabled) {
+              log.debug(TaskManager.getGarbageCollectorStatsAsString(mxbeans))
+            }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 3c11e1f..5a51265 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -37,14 +37,6 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
     cfg
   }
 
-  override def getJobManagerAkkaConfigString(): String = {
-    super.getJobManagerAkkaConfigString() + TestingUtils.getTestingSerializationBindings
-  }
-
-  override def getTaskManagerAkkaConfigString(index: Int): String = {
-    super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
-  }
-
   override def startJobManager(implicit system: ActorSystem) = {
     system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
       JobManager.JOB_MANAGER_NAME)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c30d976..af92dbb 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -102,6 +102,7 @@ public abstract class AbstractTestBase {
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, 1000000);
 		this.executor = new ForkableFlinkMiniCluster(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index cf85533..8761ec6 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -52,10 +52,6 @@ LocalFlinkMiniCluster(userConfiguration) {
     super.generateConfiguration(config)
   }
 
-  override def getTaskManagerAkkaConfigString(index: Int): String = {
-    super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
-  }
-
   override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
     val config = configuration.clone()
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 2c2d4ff..0b686e5 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
 ################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=DEBUG, A1
+log4j.rootLogger=OFF, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd9a1ba4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6739948..3cbfaa7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@ under the License.
 		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>17.0</guava.version>
 		<scala.version>2.10.4</scala.version>
-		<akka.version>2.3.6</akka.version>
+		<akka.version>2.3.7</akka.version>
 		<scala.binary.version>2.10</scala.binary.version>
 		<scala.macros.version>2.0.1</scala.macros.version>
 	</properties>
@@ -644,11 +644,11 @@ under the License.
 						<exclude>**/resources/**/font-awesome/**</exclude>
 						<exclude>**/resources/**/jquery*</exclude>
 						<exclude>**/resources/**/bootstrap*</exclude>
-						<exclude>flink-clients/resources/web-docs/js/*d3.js</exclude>
-						<exclude>flink-runtime/resources/web-docs-infoserver/css/sb-admin.css</exclude>
-						<exclude>flink-runtime/resources/web-docs-infoserver/js/flot/*</exclude>
-						<exclude>flink-runtime/resources/web-docs-infoserver/js/jcanvas.min.js</exclude>
-						<exclude>flink-runtime/resources/web-docs-infoserver/js/timeline.js</exclude>
+						<exclude>flink-clients/src/main/resources/web-docs/js/*d3.js</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/css/sb-admin.css</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/flot/*</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/jcanvas.min.js</exclude>
+						<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude>
 						<!-- Test Data. -->
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
 						<exclude>flink-addons/flink-avro/src/test/resources/avro/user.avsc</exclude>