You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2015/02/11 20:48:18 UTC

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/385

    [FLINK-1529] [jobmanager] Improve error handling on JobManager startup

    This patch improves JobManager startup error behavior, by pulling the heavy initialization of components (BlobServer, LibraryCache, InstanceManager) out of the JobManager (where construction is asynchronous in the actor theads).
    
    It also adds more detailed error messages, lets the JobManager process fail in case of a startup error, and makes test extensions of the JobManager simpler.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink jobmanager_startup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #385
    
----
commit 00659529605bcd5a3edd887d0e7ec1fa0ef51e7b
Author: Stephan Ewen <se...@apache.org>
Date:   2015-02-11T18:01:41Z

    [FLINK-1529] [jobmanager] Improve error handling on JobManager startup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24657012
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    --- End diff --
    
    I'll leave it as it is. Looks simpler to me ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/385


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24655790
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    --- End diff --
    
    Ah of course. But we could do something like that to avoid it:
    
    ```
    val parsedArgs = Try(parseArgs(args)).recoverWith{
          case t: Throwable =>
            new scala.util.Failure(new Exception("Could not parse the command line arguments", t))
        }
    
        val result = parsedArgs.map{
          case (configuration, executionMode, listeningAddress) =>
            if (SecurityUtils.isSecurityEnabled) {
              LOG.info("Security is enabled. Starting secure JobManager.")
              SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
                override def run(): Unit = {
                  runJobManager(configuration, executionMode, listeningAddress)
                }
              })
            } else {
              runJobManager(configuration, executionMode, listeningAddress)
            }
        }
    
        result.recover{
          case t: Throwable =>
            LOG.error("Failed to start JobManager.", t)
            System.exit(FAILURE_RETURN_CODE)
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24602575
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -653,11 +693,87 @@ object JobManager {
         (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries)
       }
     
    -  def startActor(configuration: Configuration)(implicit actorSystem: ActorSystem): ActorRef = {
    -    startActor(Props(classOf[JobManager], configuration))
    +  /**
    +   * Create the job manager members as (instanceManager, scheduler, libraryCacheManager,
    +   *              archiverProps, accumulatorManager, profiler, defaultExecutionRetries,
    +   *              delayBetweenRetries, timeout)
    +   *
    +   * @param configuration The configuration from which to parse the config values.
    +   * @return The members for a default JobManager.
    +   */
    +  def createJobManagerComponents(configuration: Configuration) :
    +    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
    +      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = {
    +
    +    val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
    +
    +    val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) =
    +      parseConfiguration(configuration)
    +
    +    val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
    +
    +    val profilerProps: Option[Props] = if (profilingEnabled) {
    +      Some(Props(classOf[JobManagerProfiler]))
    +    } else {
    +      None
    +    }
    +
    +    val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
    +
    +    var blobServer: BlobServer = null
    +    var instanceManager: InstanceManager = null
    +    var scheduler: FlinkScheduler = null
    +    var libraryCacheManager: BlobLibraryCacheManager = null
    +
    +    try {
    +      blobServer = new BlobServer(configuration)
    +      instanceManager = new InstanceManager()
    +      scheduler = new FlinkScheduler()
    +      libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
    +
    +      instanceManager.addInstanceListener(scheduler)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        if (libraryCacheManager != null) {
    +          libraryCacheManager.shutdown()
    +        }
    +        if (scheduler != null) {
    +          scheduler.shutdown()
    +        }
    +        if (instanceManager != null) {
    +          instanceManager.shutdown()
    +        }
    +        if (blobServer != null) {
    +          blobServer.shutdown()
    +        }
    +        throw t;
    +      }
    +    }
    +
    +    (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
    +      profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount)
    +  }
    +
    +  def startActor(configuration: Configuration, actorSystem: ActorSystem): ActorRef = {
    +
    +    val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
    +      profilerProps, executionRetries, delayBetweenRetries,
    +      timeout, _) = createJobManagerComponents(configuration)
    +
    +    val profiler: Option[ActorRef] =
    +                 profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) )
    --- End diff --
    
    Nice functional style :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/385#issuecomment-74122398
  
    Nice work. Looks really good. There are only some minor issues on Travis with the ```JobManagerFailsITCase``` which fails because the archive actor is not properly shut down before restarting the JobManager.
    
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24602187
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    +      }
    +    }
    +
    +    try {
    +      if (SecurityUtils.isSecurityEnabled) {
             LOG.info("Security is enabled. Starting secure JobManager.")
             SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
               override def run(): Unit = {
    -            start(configuration, executionMode, listeningAddress)
    +            runJobManager(configuration, executionMode, listeningAddress)
               }
             })
           } else {
    -        start(configuration, executionMode, listeningAddress)
    +        runJobManager(configuration, executionMode, listeningAddress)
    +      }
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error("Failed to start JobManager.", t)
    +        System.exit(FAILURE_RETURN_CODE)
           }
    +    }
       }
     
    -  def start(configuration: Configuration, executionMode: ExecutionMode,
    -            listeningAddress : Option[(String, Int)]): Unit = {
    -    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress)
     
    -    startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
    +  def runJobManager(configuration: Configuration,
    +                    executionMode: ExecutionMode,
    +                    listeningAddress: Option[(String, Int)]) : Unit = {
    +
    +    LOG.info("Starting JobManager")
    +    LOG.debug("Starting JobManager actor system")
     
    -    if(executionMode.equals(LOCAL)){
    -      TaskManager.startActorWithConfiguration("", configuration,
    -        localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +    val jobManagerSystem = try {
    +      AkkaUtils.createActorSystem(configuration, listeningAddress)
         }
    +    catch {
    +      case t: Throwable => {
    +        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
    +          val cause = t.getCause()
    +          if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
    +            val address = listeningAddress match {
    +              case Some((host, port)) => host + ":" + port
    +              case None => "unknown"
    +            }
     
    -    jobManagerSystem.awaitTermination()
    +            throw new Exception("Unable to create JobManager at address " + address + ": " + cause.getMessage(), t)
    +          }
    +        }
    +        throw new Exception("Could not create JobManager actor system", t)
    +      }
    +    }
    +
    +    try {
    +      LOG.debug("Starting JobManager actor")
    +
    +      startActor(configuration, jobManagerSystem)
    +
    +      if(executionMode.equals(LOCAL)){
    +        LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
    +
    +        TaskManager.startActorWithConfiguration("", configuration,
    +          localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +      }
    +
    +      jobManagerSystem.awaitTermination()
    +    }
    +    catch {
    +      case t: Throwable => {
    +        Try(jobManagerSystem.shutdown())
    --- End diff --
    
    Do you want to swallow all potential exceptions of ```ActorSystem.shutdown``` intentionally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24601897
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    --- End diff --
    
    Do we need the ```null``` expression after the ```System.exit```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24653915
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    --- End diff --
    
    We have to, to silence the compiler, since it does not know that there is no return from `System.exit()` and it is not possible to assign `Nothing` to the triplet value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24601755
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -149,7 +134,7 @@ Actor with ActorLogMessages with ActorLogging {
     
       override def receiveWithLogMessages: Receive = {
         case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) =>
    -      val taskManager = sender
    +      val taskManager = sender()
    --- End diff --
    
    This does not work with older Akka versions, which we are using with the Hadoop-2.0.0-alpha profile. I think in older Akka version it is a val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24653930
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) {
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK community. " +
    -        "Flink currently supports Java 6, but may not in future releases," +
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    +      }
    +    }
    +
    +    try {
    +      if (SecurityUtils.isSecurityEnabled) {
             LOG.info("Security is enabled. Starting secure JobManager.")
             SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
               override def run(): Unit = {
    -            start(configuration, executionMode, listeningAddress)
    +            runJobManager(configuration, executionMode, listeningAddress)
               }
             })
           } else {
    -        start(configuration, executionMode, listeningAddress)
    +        runJobManager(configuration, executionMode, listeningAddress)
    +      }
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error("Failed to start JobManager.", t)
    +        System.exit(FAILURE_RETURN_CODE)
           }
    +    }
       }
     
    -  def start(configuration: Configuration, executionMode: ExecutionMode,
    -            listeningAddress : Option[(String, Int)]): Unit = {
    -    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress)
     
    -    startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem)
    +  def runJobManager(configuration: Configuration,
    +                    executionMode: ExecutionMode,
    +                    listeningAddress: Option[(String, Int)]) : Unit = {
    +
    +    LOG.info("Starting JobManager")
    +    LOG.debug("Starting JobManager actor system")
     
    -    if(executionMode.equals(LOCAL)){
    -      TaskManager.startActorWithConfiguration("", configuration,
    -        localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +    val jobManagerSystem = try {
    +      AkkaUtils.createActorSystem(configuration, listeningAddress)
         }
    +    catch {
    +      case t: Throwable => {
    +        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
    +          val cause = t.getCause()
    +          if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
    +            val address = listeningAddress match {
    +              case Some((host, port)) => host + ":" + port
    +              case None => "unknown"
    +            }
     
    -    jobManagerSystem.awaitTermination()
    +            throw new Exception("Unable to create JobManager at address " + address + ": " + cause.getMessage(), t)
    +          }
    +        }
    +        throw new Exception("Could not create JobManager actor system", t)
    +      }
    +    }
    +
    +    try {
    +      LOG.debug("Starting JobManager actor")
    +
    +      startActor(configuration, jobManagerSystem)
    +
    +      if(executionMode.equals(LOCAL)){
    +        LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
    +
    +        TaskManager.startActorWithConfiguration("", configuration,
    +          localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem)
    +      }
    +
    +      jobManagerSystem.awaitTermination()
    +    }
    +    catch {
    +      case t: Throwable => {
    +        Try(jobManagerSystem.shutdown())
    --- End diff --
    
    I'll log them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/385#issuecomment-74236977
  
    I think you have to add ```archive ! PoisonPill``` to the ```postStop``` method of the JobManager to shutdown the archive actor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---