You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2015/10/02 14:39:08 UTC

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1213

    [FLINK-2790] [yarn] [ha] Adds high availability support for Yarn

    Adds high availability support for Yarn by exploiting Yarn's functionality to restart a failed application master. Depending on the Hadoop version the behaviour is an increasing superset of functionalities of the preceding version's behaviour
    
    ###2.3.0 <= version < 2.4.0
    
    * Set the number of application attempts to the configuration value `yarn.application-attempts`. This means that the application can be restarted `yarn.application-attempts` time before yarn fails the application. In case of an application master, all other task manager containers will also be killed.
    
    ### 2.4.0 <= version < 2.6.0
    
    * Additionally, enables that containers will be kept across application attempts. This avoids the killing of TaskManager containers in the case of an application master failure. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again.
    
    ### 2.6.0 <= version
    
    * Sets the attempts failure validity interval to the akka timeout value. The attempts failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it's application attempts.
    
    This PR also refactors the different Yarn components to allow the start-up of testing actors within Yarn. Furthermore, the `JobManager` start up logic is slightly extended to allow code reuse in the `ApplicationMasterBase`.
    
    The HA functionality is tested via the `YARNHighAvailabilityITCase` where an application master is multiple times killed. Each time it's checked that the single TaskManager successfully reconnects to the newly started `YarnJobManager`. In case of version `2.3.0`, the `TaskManager` is restarted.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink yarnHA

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1213
    
----
commit 1a18172ae69eb576638704f8e143a921aa8630d5
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-09-01T14:35:48Z

    [FLINK-2790] [yarn] [ha] Adds high availability support for Yarn

commit 5359676556d16610303d4f36fcbe5320ef4e6643
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-09-23T15:42:57Z

    Refactors JobManager's start actors method to be reusable

commit d6a47cd8ad265c5122d1a67c09773cbc5a491261
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-09-24T12:55:18Z

    Yarn refactoring to introduce yarn testing functionality

commit f9578f136dd41cd9829d712f7c62a59c9ea8e194
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-09-28T16:21:30Z

    Added support for testing yarn cluster. Extracted JobManager's and TaskManager's testing messages into stackable traits.

commit dbfa16438ad9d7d61e8d1a582c8cd1de9352078e
Author: Till Rohrmann <tr...@apache.org>
Date:   2015-09-29T15:05:01Z

    Implemented YarnHighAvailabilityITCase using Akka messages for synchronization.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41374461
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, Cancellable, ActorRef}
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
    +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
    +
    +import scala.collection.mutable
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a JobManager with messages for testing purpose.
    +  *
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146136443
  
    I found the issue. My code did not properly reflected the methods `setKeepContainersAcrossApplicationAttempts` and `setAttemptFailuresValidityInterval`. With the fix, already started containers are retained. Tested it with Yarn 2.7.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387583
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, Cancellable, ActorRef}
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
    +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
    +
    +import scala.collection.mutable
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a JobManager with messages for testing purpose.
    +  *
    --- End diff --
    
    Let's see whether it still recognized as a ScalaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41374493
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, Cancellable, ActorRef}
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
    +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
    +
    +import scala.collection.mutable
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a JobManager with messages for testing purpose.
    +  *
    +  */
    +trait TestingJobManagerLike extends FlinkActor {
    +  that: JobManager =>
    +
    +  import scala.collection.JavaConverters._
    +  import context._
    +
    +  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
    +
    +  val waitForAllVerticesToBeRunningOrFinished =
    +    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  var periodicCheck: Option[Cancellable] = None
    +
    +  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
    +    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
    +
    +  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
    +
    +  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
    +
    +  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
    +    new Ordering[(Int, ActorRef)] {
    +      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
    +    })
    +
    +  var disconnectDisabled = false
    +
    +  abstract override def handleMessage: Receive = {
    +    handleTestingMessage orElse super.handleMessage
    +  }
    +
    +  def handleTestingMessage: Receive = {
    +    case Alive => sender() ! Acknowledge
    +
    +    case RequestExecutionGraph(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
    +          ExecutionGraphFound(
    +            jobID,
    +            executionGraph)
    +        )
    +
    +        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
    +      }
    +
    +    case WaitForAllVerticesToBeRunning(jobID) =>
    +      if(checkIfAllVerticesRunning(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
    +        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
    +      if(checkIfAllVerticesRunningOrFinished(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
    +        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +
    +    case NotifyListeners =>
    +      for(jobID <- currentJobs.keySet){
    +        notifyListeners(jobID)
    +      }
    +
    +      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
    +        periodicCheck foreach { _.cancel() }
    +        periodicCheck = None
    +      }
    +
    +
    +    case NotifyWhenJobRemoved(jobID) =>
    +      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
    +
    +      val responses = gateways.map{
    +        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
    +      }
    +
    +      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
    +
    +      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
    +
    +      import context.dispatcher
    +      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
    +
    +    case CheckIfJobRemoved(jobID) =>
    +      if(currentJobs.contains(jobID)) {
    +        context.system.scheduler.scheduleOnce(
    +          200 milliseconds,
    +          self,
    +          decorateMessage(CheckIfJobRemoved(jobID))
    +        )(context.dispatcher, sender())
    +      } else {
    +        sender() ! decorateMessage(true)
    +      }
    +
    +    case NotifyWhenTaskManagerTerminated(taskManager) =>
    +      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
    +      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
    +
    +    case msg@Terminated(taskManager) =>
    +      super.handleMessage(msg)
    +
    +      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
    +        _ foreach {
    +          listener =>
    +            listener ! decorateMessage(TaskManagerTerminated(taskManager))
    +        }
    +      }
    +
    +    case NotifyWhenAccumulatorChange(jobID) =>
    +
    +      val (updated, registered) = waitForAccumulatorUpdate.
    +        getOrElse(jobID, (false, Set[ActorRef]()))
    +      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
    +      sender ! true
    +
    +    /**
    +     * Notification from the task manager that changed accumulator are transferred on next
    +     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
    +     */
    +    case AccumulatorsChanged(jobID: JobID) =>
    +      waitForAccumulatorUpdate.get(jobID) match {
    +        case Some((updated, registered)) =>
    +          waitForAccumulatorUpdate.put(jobID, (true, registered))
    +        case None =>
    +      }
    +
    +    /**
    +     * Disabled async processing of accumulator values and send accumulators to the listeners if
    +     * we previously received an [[AccumulatorsChanged]] message.
    +     */
    +    case msg : Heartbeat =>
    +      super.handleMessage(msg)
    +
    +      waitForAccumulatorUpdate foreach {
    +        case (jobID, (updated, actors)) if updated =>
    +          currentJobs.get(jobID) match {
    +            case Some((graph, jobInfo)) =>
    +              val flinkAccumulators = graph.getFlinkAccumulators
    +              val userAccumulators = graph.aggregateUserAccumulators
    +              actors foreach {
    +                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
    +              }
    +            case None =>
    +          }
    +          waitForAccumulatorUpdate.put(jobID, (false, actors))
    +        case _ =>
    +      }
    +
    +    case RequestWorkingTaskManager(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((eg, _)) =>
    +          if(eg.getAllExecutionVertices.asScala.isEmpty){
    +            sender ! decorateMessage(WorkingTaskManager(None))
    +          } else {
    +            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
    +
    +            if(resource == null){
    +              sender ! decorateMessage(WorkingTaskManager(None))
    +            } else {
    +              sender ! decorateMessage(
    +                WorkingTaskManager(
    +                  Some(resource.getInstance().getActorGateway)
    +                )
    +              )
    +            }
    +          }
    +        case None => sender ! decorateMessage(WorkingTaskManager(None))
    +      }
    +
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-145451351
  
    Looks like a lot of work to figure out the different version behaviours. Good job and thanks for the clear explanation. :)
    
    I guess Robert meant with "not restarting properly" that the TMs were restarted as well.
    
    How does the way you kill the AM affect recovery?
    
    I will try this out later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146136479
  
    Good catch @rmetzger :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1213


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146150504
  
    The code looks good. :-) I like the changes around the testing classes and the changes to the retrieval utils (that one I've looked into in more detail). I didn't check the YARN logic in detail though. I'm going to try it out now locally and report back. I expect that to work just fine and then we can safely merge this I think. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41374384
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---
    @@ -103,18 +103,17 @@ class TestingCluster(
           instanceManager,
           scheduler,
           libraryCacheManager,
    -      _,
           executionRetries,
           delayBetweenRetries,
           timeout,
           archiveCount,
    -      leaderElectionService) = JobManager.createJobManagerComponents(config)
    +      leaderElectionService) = JobManager.createJobManagerComponents(
    +      config,
    --- End diff --
    
    indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-145404233
  
    I've looked a bit over the code and it all looks like good changes.
    
    I've tested the code on a Hadoop 2.6.0 cluster and the YARN session was restarting properly. But I think that's not the expected behavior.
    We actually want the TMs to stay alive and reconnect to the new AM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387526
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---
    @@ -103,18 +103,17 @@ class TestingCluster(
           instanceManager,
           scheduler,
           libraryCacheManager,
    -      _,
           executionRetries,
           delayBetweenRetries,
           timeout,
           archiveCount,
    -      leaderElectionService) = JobManager.createJobManagerComponents(config)
    +      leaderElectionService) = JobManager.createJobManagerComponents(
    +      config,
    --- End diff --
    
    Good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-145450115
  
    I think TMs are only kept alive if their containers have been properly started. If the AM happens to die while the TM container are started up, I think they will be terminated as well. Another question is how did you kill the AM and what do you mean with "[...] restarting properly. But I think that's not the expected behavior"? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387623
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, Cancellable, ActorRef}
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
    +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
    +
    +import scala.collection.mutable
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a JobManager with messages for testing purpose.
    +  *
    +  */
    +trait TestingJobManagerLike extends FlinkActor {
    +  that: JobManager =>
    +
    +  import scala.collection.JavaConverters._
    +  import context._
    +
    +  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
    +
    +  val waitForAllVerticesToBeRunningOrFinished =
    +    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  var periodicCheck: Option[Cancellable] = None
    +
    +  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
    +    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
    +
    +  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
    +
    +  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
    +
    +  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
    +    new Ordering[(Int, ActorRef)] {
    +      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
    +    })
    +
    +  var disconnectDisabled = false
    +
    +  abstract override def handleMessage: Receive = {
    +    handleTestingMessage orElse super.handleMessage
    +  }
    +
    +  def handleTestingMessage: Receive = {
    +    case Alive => sender() ! Acknowledge
    +
    +    case RequestExecutionGraph(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
    +          ExecutionGraphFound(
    +            jobID,
    +            executionGraph)
    +        )
    +
    +        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
    +      }
    +
    +    case WaitForAllVerticesToBeRunning(jobID) =>
    +      if(checkIfAllVerticesRunning(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
    +        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
    +      if(checkIfAllVerticesRunningOrFinished(jobID)){
    +        sender() ! decorateMessage(AllVerticesRunning(jobID))
    +      }else{
    +        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
    +        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
    +
    +        if(periodicCheck.isEmpty){
    +          periodicCheck =
    +            Some(
    +              context.system.scheduler.schedule(
    +                0 seconds,
    +                200 millis,
    +                self,
    +                decorateMessage(NotifyListeners)
    +              )
    +            )
    +        }
    +      }
    +
    +    case NotifyListeners =>
    +      for(jobID <- currentJobs.keySet){
    +        notifyListeners(jobID)
    +      }
    +
    +      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
    +        periodicCheck foreach { _.cancel() }
    +        periodicCheck = None
    +      }
    +
    +
    +    case NotifyWhenJobRemoved(jobID) =>
    +      val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway)
    +
    +      val responses = gateways.map{
    +        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
    +      }
    +
    +      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
    +
    +      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
    +
    +      import context.dispatcher
    +      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
    +
    +    case CheckIfJobRemoved(jobID) =>
    +      if(currentJobs.contains(jobID)) {
    +        context.system.scheduler.scheduleOnce(
    +          200 milliseconds,
    +          self,
    +          decorateMessage(CheckIfJobRemoved(jobID))
    +        )(context.dispatcher, sender())
    +      } else {
    +        sender() ! decorateMessage(true)
    +      }
    +
    +    case NotifyWhenTaskManagerTerminated(taskManager) =>
    +      val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
    +      waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender)
    +
    +    case msg@Terminated(taskManager) =>
    +      super.handleMessage(msg)
    +
    +      waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach {
    +        _ foreach {
    +          listener =>
    +            listener ! decorateMessage(TaskManagerTerminated(taskManager))
    +        }
    +      }
    +
    +    case NotifyWhenAccumulatorChange(jobID) =>
    +
    +      val (updated, registered) = waitForAccumulatorUpdate.
    +        getOrElse(jobID, (false, Set[ActorRef]()))
    +      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
    +      sender ! true
    +
    +    /**
    +     * Notification from the task manager that changed accumulator are transferred on next
    +     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
    +     */
    +    case AccumulatorsChanged(jobID: JobID) =>
    +      waitForAccumulatorUpdate.get(jobID) match {
    +        case Some((updated, registered)) =>
    +          waitForAccumulatorUpdate.put(jobID, (true, registered))
    +        case None =>
    +      }
    +
    +    /**
    +     * Disabled async processing of accumulator values and send accumulators to the listeners if
    +     * we previously received an [[AccumulatorsChanged]] message.
    +     */
    +    case msg : Heartbeat =>
    +      super.handleMessage(msg)
    +
    +      waitForAccumulatorUpdate foreach {
    +        case (jobID, (updated, actors)) if updated =>
    +          currentJobs.get(jobID) match {
    +            case Some((graph, jobInfo)) =>
    +              val flinkAccumulators = graph.getFlinkAccumulators
    +              val userAccumulators = graph.aggregateUserAccumulators
    +              actors foreach {
    +                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
    +              }
    +            case None =>
    +          }
    +          waitForAccumulatorUpdate.put(jobID, (false, actors))
    +        case _ =>
    +      }
    +
    +    case RequestWorkingTaskManager(jobID) =>
    +      currentJobs.get(jobID) match {
    +        case Some((eg, _)) =>
    +          if(eg.getAllExecutionVertices.asScala.isEmpty){
    +            sender ! decorateMessage(WorkingTaskManager(None))
    +          } else {
    +            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
    +
    +            if(resource == null){
    +              sender ! decorateMessage(WorkingTaskManager(None))
    +            } else {
    +              sender ! decorateMessage(
    +                WorkingTaskManager(
    +                  Some(resource.getInstance().getActorGateway)
    +                )
    +              )
    +            }
    +          }
    +        case None => sender ! decorateMessage(WorkingTaskManager(None))
    +      }
    +
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41374754
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---
    @@ -0,0 +1,867 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.client.CliFrontend;
    +import org.apache.flink.client.FlinkYarnSessionCli;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsAction;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.hadoop.yarn.api.records.ApplicationReport;
    +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.api.records.NodeReport;
    +import org.apache.hadoop.yarn.api.records.NodeState;
    +import org.apache.hadoop.yarn.api.records.QueueInfo;
    +import org.apache.hadoop.yarn.api.records.Resource;
    +import org.apache.hadoop.yarn.api.records.YarnApplicationState;
    +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
    +import org.apache.hadoop.yarn.client.api.YarnClient;
    +import org.apache.hadoop.yarn.client.api.YarnClientApplication;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.exceptions.YarnException;
    +import org.apache.hadoop.yarn.util.Records;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    +* All classes in this package contain code taken from
    +* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
    +* and
    +* https://github.com/hortonworks/simple-yarn-app
    +* and
    +* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
    +*
    +* The Flink jar is uploaded to HDFS by this client.
    +* The application master and all the TaskManager containers get the jar file downloaded
    +* by YARN into their local fs.
    +*
    +*/
    +public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
    +
    +	/**
    +	 * Constants,
    +	 * all starting with ENV_ are used as environment variables to pass values from the Client
    +	 * to the Application Master.
    +	 */
    +	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
    +	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
    +	public final static String ENV_APP_ID = "_APP_ID";
    +	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
    +	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
    +	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
    +	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
    +	public static final String ENV_SLOTS = "_SLOTS";
    +	public static final String ENV_DETACHED = "_DETACHED";
    +	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
    +	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
    +
    +
    +	/**
    +	 * Minimum memory requirements, checked by the Client.
    +	 */
    +	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
    +	private static final int MIN_TM_MEMORY = 768;
    +
    +	private Configuration conf;
    +	private YarnClient yarnClient;
    +	private YarnClientApplication yarnApplication;
    +
    +
    +	/**
    +	 * Files (usually in a distributed file system) used for the YARN session of Flink.
    +	 * Contains configuration files and jar files.
    +	 */
    +	private Path sessionFilesDir;
    +
    +	/**
    +	 * If the user has specified a different number of slots, we store them here
    +	 */
    +	private int slots = -1;
    +
    +	private int jobManagerMemoryMb = 1024;
    +
    +	private int taskManagerMemoryMb = 1024;
    +
    +	private int taskManagerCount = 1;
    +
    +	private String yarnQueue = null;
    +
    +	private String configurationDirectory;
    +
    +	private Path flinkConfigurationPath;
    +
    +	private Path flinkLoggingConfigurationPath; // optional
    +
    +	private Path flinkJarPath;
    +
    +	private String dynamicPropertiesEncoded;
    +
    +	private List<File> shipFiles = new ArrayList<File>();
    +	private org.apache.flink.configuration.Configuration flinkConfiguration;
    +
    +	private boolean detached;
    +	private boolean streamingMode;
    +
    +	private String customName = null;
    +
    +	public FlinkYarnClientBase() {
    +		conf = new YarnConfiguration();
    +		if(this.yarnClient == null) {
    +			// Create yarnClient
    +			yarnClient = YarnClient.createYarnClient();
    +			yarnClient.init(conf);
    +			yarnClient.start();
    +		}
    +
    +		// for unit tests only
    +		if(System.getenv("IN_TESTS") != null) {
    +			try {
    +				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
    +			} catch (Throwable t) {
    +				throw new RuntimeException("Error",t);
    +			}
    +		}
    +	}
    +
    +	protected abstract Class<?> getApplicationMasterClass();
    +
    +	@Override
    +	public void setJobManagerMemory(int memoryMb) {
    +		if(memoryMb < MIN_JM_MEMORY) {
    +			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
    +				+ "of " + MIN_JM_MEMORY+ " MB");
    +		}
    +		this.jobManagerMemoryMb = memoryMb;
    +	}
    +
    +	@Override
    +	public void setTaskManagerMemory(int memoryMb) {
    +		if(memoryMb < MIN_TM_MEMORY) {
    +			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
    +				+ "of " + MIN_TM_MEMORY+ " MB");
    +		}
    +		this.taskManagerMemoryMb = memoryMb;
    +	}
    +
    +	@Override
    +	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
    +		this.flinkConfiguration = conf;
    +	}
    +
    +	@Override
    +	public void setTaskManagerSlots(int slots) {
    +		if(slots <= 0) {
    +			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
    +		}
    +		this.slots = slots;
    +	}
    +
    +	@Override
    +	public int getTaskManagerSlots() {
    +		return this.slots;
    +	}
    +
    +	@Override
    +	public void setQueue(String queue) {
    +		this.yarnQueue = queue;
    +	}
    +
    +	@Override
    +	public void setLocalJarPath(Path localJarPath) {
    +		if(!localJarPath.toString().endsWith("jar")) {
    +			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
    +		}
    +		this.flinkJarPath = localJarPath;
    +	}
    +
    +	@Override
    +	public void setConfigurationFilePath(Path confPath) {
    +		flinkConfigurationPath = confPath;
    +	}
    +
    +	public void setConfigurationDirectory(String configurationDirectory) {
    +		this.configurationDirectory = configurationDirectory;
    +	}
    +
    +	@Override
    +	public void setFlinkLoggingConfigurationPath(Path logConfPath) {
    +		flinkLoggingConfigurationPath = logConfPath;
    +	}
    +
    +	@Override
    +	public Path getFlinkLoggingConfigurationPath() {
    +		return flinkLoggingConfigurationPath;
    +	}
    +
    +	@Override
    +	public void setTaskManagerCount(int tmCount) {
    +		if(tmCount < 1) {
    +			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
    +		}
    +		this.taskManagerCount = tmCount;
    +	}
    +
    +	@Override
    +	public int getTaskManagerCount() {
    +		return this.taskManagerCount;
    +	}
    +
    +	@Override
    +	public void setShipFiles(List<File> shipFiles) {
    +		for(File shipFile: shipFiles) {
    +			// remove uberjar from ship list (by default everything in the lib/ folder is added to
    +			// the list of files to ship, but we handle the uberjar separately.
    +			if(!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
    +				this.shipFiles.add(shipFile);
    +			}
    +		}
    +	}
    +
    +	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
    +		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
    +	}
    +
    +	@Override
    +	public String getDynamicPropertiesEncoded() {
    +		return this.dynamicPropertiesEncoded;
    +	}
    +
    +
    +	public void isReadyForDeployment() throws YarnDeploymentException {
    +		if(taskManagerCount <= 0) {
    +			throw new YarnDeploymentException("Taskmanager count must be positive");
    +		}
    +		if(this.flinkJarPath == null) {
    +			throw new YarnDeploymentException("The Flink jar path is null");
    +		}
    +		if(this.configurationDirectory == null) {
    +			throw new YarnDeploymentException("Configuration directory not set");
    +		}
    +		if(this.flinkConfigurationPath == null) {
    +			throw new YarnDeploymentException("Configuration path not set");
    +		}
    +		if(this.flinkConfiguration == null) {
    +			throw new YarnDeploymentException("Flink configuration object has not been set");
    +		}
    +
    +		// check if required Hadoop environment variables are set. If not, warn user
    +		if(System.getenv("HADOOP_CONF_DIR") == null &&
    +			System.getenv("YARN_CONF_DIR") == null) {
    +			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
    +				"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
    +				"configuration for accessing YARN.");
    +		}
    +	}
    +
    +	public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
    +		for(int i = 0; i < nodeManagers.length; i++) {
    +			if(nodeManagers[i] >= toAllocate) {
    +				nodeManagers[i] -= toAllocate;
    +				return true;
    +			}
    +		}
    +		return false;
    +	}
    +
    +	@Override
    +	public void setDetachedMode(boolean detachedMode) {
    +		this.detached = detachedMode;
    +	}
    +
    +	@Override
    +	public boolean isDetached() {
    +		return detached;
    +	}
    +
    +	public AbstractFlinkYarnCluster deploy() throws Exception {
    +
    +		UserGroupInformation.setConfiguration(conf);
    +		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    +
    +		if (UserGroupInformation.isSecurityEnabled()) {
    +			if (!ugi.hasKerberosCredentials()) {
    +				throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
    +					"You may use kinit to authenticate and request a TGT from the Kerberos server.");
    +			}
    +			return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
    +				@Override
    +				public AbstractFlinkYarnCluster run() throws Exception {
    +					return deployInternal();
    +				}
    +			});
    +		} else {
    +			return deployInternal();
    +		}
    +	}
    +
    +
    +
    +	/**
    +	 * This method will block until the ApplicationMaster/JobManager have been
    +	 * deployed on YARN.
    +	 */
    +	protected AbstractFlinkYarnCluster deployInternal() throws Exception {
    +		isReadyForDeployment();
    +
    +		LOG.info("Using values:");
    +		LOG.info("\tTaskManager count = {}", taskManagerCount);
    +		LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
    +		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
    +
    +		// Create application via yarnClient
    +		yarnApplication = yarnClient.createApplication();
    +		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    +
    +		// ------------------ Add dynamic properties to local flinkConfiguraton ------
    +
    +		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
    +		for (Tuple2<String, String> dynProperty : dynProperties) {
    +			flinkConfiguration.setString(dynProperty.f0, dynProperty.f1);
    +		}
    +
    +		// ------------------ Check if the specified queue exists --------------
    +
    +		try {
    +			List<QueueInfo> queues = yarnClient.getAllQueues();
    +			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
    +				boolean queueFound = false;
    +				for (QueueInfo queue : queues) {
    +					if (queue.getQueueName().equals(this.yarnQueue)) {
    +						queueFound = true;
    +						break;
    +					}
    +				}
    +				if (!queueFound) {
    +					String queueNames = "";
    +					for (QueueInfo queue : queues) {
    +						queueNames += queue.getQueueName() + ", ";
    +					}
    +					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
    +						"Available queues: " + queueNames);
    +				}
    +			} else {
    +				LOG.debug("The YARN cluster does not have any queues configured");
    +			}
    +		} catch(Throwable e) {
    +			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
    +			if(LOG.isDebugEnabled()) {
    +				LOG.debug("Error details", e);
    +			}
    +		}
    +
    +		// ------------------ Check if the YARN Cluster has the requested resources --------------
    +
    +		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
    +		// all allocations below this value are automatically set to this value.
    +		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
    +		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
    +			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
    +				+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
    +				"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
    +				"you requested will start.");
    +		}
    +
    +		// set the memory to minAllocationMB to do the next checks correctly
    +		if(jobManagerMemoryMb < yarnMinAllocationMB) {
    +			jobManagerMemoryMb =  yarnMinAllocationMB;
    +		}
    +		if(taskManagerMemoryMb < yarnMinAllocationMB) {
    +			taskManagerMemoryMb =  yarnMinAllocationMB;
    +		}
    +
    +		Resource maxRes = appResponse.getMaximumResourceCapability();
    +		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    +		if(jobManagerMemoryMb > maxRes.getMemory() ) {
    +			failSessionDuringDeployment();
    +			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    +				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
    +		}
    +
    +		if(taskManagerMemoryMb > maxRes.getMemory() ) {
    +			failSessionDuringDeployment();
    +			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    +				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
    +		}
    +
    +		final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
    +			"connecting from the beginning because the resources are currently not available in the cluster. " +
    +			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
    +			"the resources become available.";
    +		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    +		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    +		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    +				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
    +
    +		}
    +		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    +				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
    +		}
    +		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    +				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
    +		}
    +
    +		// ----------------- check if the requested containers fit into the cluster.
    +
    +		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		// first, allocate the jobManager somewhere.
    +		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
    +			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
    +				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    +				Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
    +		}
    +		// allocate TaskManagers
    +		for(int i = 0; i < taskManagerCount; i++) {
    +			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
    +				LOG.warn("There is not enough memory available in the YARN cluster. " +
    +					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    +					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
    +					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + NOTE_RSC );
    +			}
    +		}
    +
    +		// ------------------ Prepare Application Master Container  ------------------------------
    +
    +		// respect custom JVM options in the YAML file
    +		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
    +
    +		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
    +		boolean hasLogback = new File(logbackFile).exists();
    +		String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
    +
    +		boolean hasLog4j = new File(log4jFile).exists();
    +		if(hasLogback) {
    +			shipFiles.add(new File(logbackFile));
    +		}
    +		if(hasLog4j) {
    +			shipFiles.add(new File(log4jFile));
    +		}
    +
    +		// Set up the container launch context for the application master
    +		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
    +
    +		String amCommand = "$JAVA_HOME/bin/java"
    +			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
    +
    +		if(hasLogback || hasLog4j) {
    +			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
    +
    +			if(hasLogback) {
    +				amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
    +			}
    +
    +			if(hasLog4j) {
    +				amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
    +			}
    +		}
    +
    +		amCommand 	+= " " + getApplicationMasterClass().getName() + " "
    --- End diff --
    
    indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146239375
  
    Thanks for the review @uce. I addressed your comments.
    
    I fixed the problem with Hadoop 2.6.0 by relocating Flink's curator dependency to `org.apache.flink.shaded.org.apache.curator`. Furthermore, I bumped `flink-shaded-curator's` Guava version to Flink's Guava version so that we don't include too many different Guava versions in the resulting `flink-dist.jar`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146232148
  
    Smooth experience now. Works as expected with vanilla YARN 2.6.0.
    
    +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387670
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, ActorRef}
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
    +import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
    +RequestLeaderSessionID}
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
    +AcknowledgeRegistration}
    +import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
    +import org.apache.flink.runtime.taskmanager.TaskManager
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
    +
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a TaskManager with messages for testing purposes.
    +  *
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-146170086
  
    I have a dependency problem with Curator leading to:
    
    ```bash
    ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
    java.lang.NoSuchMethodError: org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String;
        at org.apache.curator.framework.imps.NamespaceImpl.<init>(NamespaceImpl.java:37)
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:113)
        at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124)
        at org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:83)
        at org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderElectionService(ZooKeeperUtils.java:145)
        at org.apache.flink.runtime.util.LeaderElectionUtils.createLeaderElectionService(LeaderElectionUtils.java:52)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:1595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1672)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:1629)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:1307)
        at org.apache.flink.yarn.ApplicationMasterBase.runAction(ApplicationMasterBase.scala:127)
        at org.apache.flink.yarn.ApplicationMasterBase$$anon$1.run(ApplicationMasterBase.scala:76)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:360)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1608)
        at org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:74)
        at org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:35)
        at org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala)
    ```
    
    Our application master class path:
    ```bash
    13:45:50,795 DEBUG org.apache.flink.yarn.ApplicationMaster                       - All environment variables: {PATH=/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/X11/bin, HADOOP_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, MAX_APP_ATTEMPTS=2, HADOOP_SECURE_DN_PID_DIR=, HADOOP_PID_DIR=, MAIL=/var/mail/ufuk, LD_LIBRARY_PATH=:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native:/Users/ufuk/Downloads/hadoop-2.6.0/lib/native, LOGNAME=ufuk, JVM_PID=13307, _DETACHED=false, PWD=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004/container_1444217271951_0004_01_000001, HADOOP_YARN_USER=yarn, HADOOP_PREFIX=/Users/ufuk/Downloads/hadoop-2.6.0, LOCAL_DIRS=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004, YARN_IDENT_STRING=ufuk, HADOOP_SECURE_DN_LOG_DIR=/, SHELL=/bin/zsh, YARN_CONF_DIR=/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop, JAVA_MAIN_CLASS_10305=org.apache.hadoop.yarn.server.nodemanager.NodeManager, LOG_DIRS=/
 Users/ufuk/Downloads/hadoop-2.6.0/logs/userlogs/application_1444217271951_0004/container_1444217271951_0004_01_000001, _CLIENT_SHIP_FILES=file:/Users/ufuk/.flink/application_1444217271951_0004/flink-python-0.10-SNAPSHOT.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j-1.2.17.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/slf4j-log4j12-1.7.7.jar,file:/Users/ufuk/.flink/application_1444217271951_0004/logback.xml,file:/Users/ufuk/.flink/application_1444217271951_0004/log4j.properties, _CLIENT_USERNAME=ufuk, HADOOP_YARN_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, TMPDIR=/var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m00000gn/T/, HADOOP_DATANODE_OPTS=-Dhadoop.security.logger=ERROR,RFAS -Dhadoop.security.logger=ERROR,RFAS , HADOOP_SECONDARYNAMENODE_OPTS=-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender , _FLINK_JAR_PATH=file:/Users/ufuk/.flink/application_1444217271951_0004/flin
 k-dist-0.10-SNAPSHOT.jar, __CF_USER_TEXT_ENCODING=0x1F5:0x0:0x0, LC_CTYPE=UTF-8, _CLIENT_TM_COUNT=2, _CLIENT_TM_MEMORY=1024, SHLVL=3, HADOOP_IDENT_STRING=ufuk, YARN_ROOT_LOGGER=INFO,RFA, _SLOTS=-1, _CLIENT_HOME_DIR=file:/Users/ufuk, APP_SUBMIT_TIME_ENV=1444218347305, NM_HOST=192.168.178.69, _APP_ID=application_1444217271951_0004, YARN_LOGFILE=yarn-ufuk-nodemanager-vinci.local.log, HADOOP_SECURE_DN_USER=, HADOOP_CLASSPATH=/contrib/capacity-scheduler/*.jar:/contrib/capacity-scheduler/*.jar, HADOOP_HDFS_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, HADOOP_MAPRED_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, HADOOP_COMMON_HOME=/Users/ufuk/Downloads/hadoop-2.6.0, HADOOP_CLIENT_OPTS=-Xmx512m -Xmx512m , _=/bin/java, APPLICATION_WEB_PROXY_BASE=/proxy/application_1444217271951_0004, _STREAMING_MODE=false, NM_HTTP_PORT=8042, JAVA_MAIN_CLASS_13316=org.apache.flink.yarn.ApplicationMaster, HADOOP_OPTS= -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/Users/ufuk/Downloads/hadoop-2.6.0/logs -Dhadoop.log.fi
 le=hadoop.log -Dhadoop.home.dir=/Users/ufuk/Downloads/hadoop-2.6.0 -Dhadoop.id.str=ufuk -Dhadoop.root.logger=INFO,console -Djava.library.path=/Users/ufuk/Downloads/hadoop-2.6.0/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/Users/ufuk/Downloads/hadoop-2.6.0/logs -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/Users/ufuk/Downloads/hadoop-2.6.0 -Dhadoop.id.str=ufuk -Dhadoop.root.logger=INFO,console -Djava.library.path=/Users/ufuk/Downloads/hadoop-2.6.0/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true, SSH_CLIENT=::1 60015 22, NM_PORT=60017, USER=ufuk, CLASSPATH=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004/container_1444217271951_0004_01_000001/*:/Users/ufuk/Downloads/hadoop-2.6.0/etc/hadoop:/Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/common/*:/Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/common/lib/*:/Users/ufuk/Downloads/h
 adoop-2.6.0/share/hadoop/hdfs/*:/Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/yarn/*:/Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/yarn/lib/*, SSH_CONNECTION=::1 60015 ::1 22, HADOOP_TOKEN_FILE_LOCATION=/tmp/hadoop-ufuk/nm-local-dir/usercache/ufuk/appcache/application_1444217271951_0004/container_1444217271951_0004_01_000001/container_tokens, HADOOP_NFS3_OPTS=, HADOOP_NAMENODE_OPTS=-Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender -Dhadoop.security.logger=INFO,RFAS -Dhdfs.audit.logger=INFO,NullAppender , YARN_NICENESS=0, HOME=/home/, CONTAINER_ID=container_1444217271951_0004_01_000001, HADOOP_PORTMAP_OPTS=-Xmx512m -Xmx512m , MALLOC_ARENA_MAX=4}
    ```
    
    The problem is */Users/ufuk/Downloads/hadoop-2.6.0/share/hadoop/yarn/lib/**, which contains Curator 2.6.
    
    I've used vanilla YARN 2.6.0. Robert for whom it worked used cdh 5.4.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-145577205
  
    I killed the AM using "kill" from the command line, and all the Taskmanagers were shutting down



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41387700
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---
    @@ -0,0 +1,867 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.client.CliFrontend;
    +import org.apache.flink.client.FlinkYarnSessionCli;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
    +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsAction;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.hadoop.yarn.api.records.ApplicationReport;
    +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.api.records.NodeReport;
    +import org.apache.hadoop.yarn.api.records.NodeState;
    +import org.apache.hadoop.yarn.api.records.QueueInfo;
    +import org.apache.hadoop.yarn.api.records.Resource;
    +import org.apache.hadoop.yarn.api.records.YarnApplicationState;
    +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
    +import org.apache.hadoop.yarn.client.api.YarnClient;
    +import org.apache.hadoop.yarn.client.api.YarnClientApplication;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.exceptions.YarnException;
    +import org.apache.hadoop.yarn.util.Records;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.PrintStream;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    +* All classes in this package contain code taken from
    +* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
    +* and
    +* https://github.com/hortonworks/simple-yarn-app
    +* and
    +* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
    +*
    +* The Flink jar is uploaded to HDFS by this client.
    +* The application master and all the TaskManager containers get the jar file downloaded
    +* by YARN into their local fs.
    +*
    +*/
    +public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
    +
    +	/**
    +	 * Constants,
    +	 * all starting with ENV_ are used as environment variables to pass values from the Client
    +	 * to the Application Master.
    +	 */
    +	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
    +	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
    +	public final static String ENV_APP_ID = "_APP_ID";
    +	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
    +	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
    +	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
    +	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
    +	public static final String ENV_SLOTS = "_SLOTS";
    +	public static final String ENV_DETACHED = "_DETACHED";
    +	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
    +	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
    +
    +
    +	/**
    +	 * Minimum memory requirements, checked by the Client.
    +	 */
    +	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
    +	private static final int MIN_TM_MEMORY = 768;
    +
    +	private Configuration conf;
    +	private YarnClient yarnClient;
    +	private YarnClientApplication yarnApplication;
    +
    +
    +	/**
    +	 * Files (usually in a distributed file system) used for the YARN session of Flink.
    +	 * Contains configuration files and jar files.
    +	 */
    +	private Path sessionFilesDir;
    +
    +	/**
    +	 * If the user has specified a different number of slots, we store them here
    +	 */
    +	private int slots = -1;
    +
    +	private int jobManagerMemoryMb = 1024;
    +
    +	private int taskManagerMemoryMb = 1024;
    +
    +	private int taskManagerCount = 1;
    +
    +	private String yarnQueue = null;
    +
    +	private String configurationDirectory;
    +
    +	private Path flinkConfigurationPath;
    +
    +	private Path flinkLoggingConfigurationPath; // optional
    +
    +	private Path flinkJarPath;
    +
    +	private String dynamicPropertiesEncoded;
    +
    +	private List<File> shipFiles = new ArrayList<File>();
    +	private org.apache.flink.configuration.Configuration flinkConfiguration;
    +
    +	private boolean detached;
    +	private boolean streamingMode;
    +
    +	private String customName = null;
    +
    +	public FlinkYarnClientBase() {
    +		conf = new YarnConfiguration();
    +		if(this.yarnClient == null) {
    +			// Create yarnClient
    +			yarnClient = YarnClient.createYarnClient();
    +			yarnClient.init(conf);
    +			yarnClient.start();
    +		}
    +
    +		// for unit tests only
    +		if(System.getenv("IN_TESTS") != null) {
    +			try {
    +				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
    +			} catch (Throwable t) {
    +				throw new RuntimeException("Error",t);
    +			}
    +		}
    +	}
    +
    +	protected abstract Class<?> getApplicationMasterClass();
    +
    +	@Override
    +	public void setJobManagerMemory(int memoryMb) {
    +		if(memoryMb < MIN_JM_MEMORY) {
    +			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
    +				+ "of " + MIN_JM_MEMORY+ " MB");
    +		}
    +		this.jobManagerMemoryMb = memoryMb;
    +	}
    +
    +	@Override
    +	public void setTaskManagerMemory(int memoryMb) {
    +		if(memoryMb < MIN_TM_MEMORY) {
    +			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
    +				+ "of " + MIN_TM_MEMORY+ " MB");
    +		}
    +		this.taskManagerMemoryMb = memoryMb;
    +	}
    +
    +	@Override
    +	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
    +		this.flinkConfiguration = conf;
    +	}
    +
    +	@Override
    +	public void setTaskManagerSlots(int slots) {
    +		if(slots <= 0) {
    +			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
    +		}
    +		this.slots = slots;
    +	}
    +
    +	@Override
    +	public int getTaskManagerSlots() {
    +		return this.slots;
    +	}
    +
    +	@Override
    +	public void setQueue(String queue) {
    +		this.yarnQueue = queue;
    +	}
    +
    +	@Override
    +	public void setLocalJarPath(Path localJarPath) {
    +		if(!localJarPath.toString().endsWith("jar")) {
    +			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
    +		}
    +		this.flinkJarPath = localJarPath;
    +	}
    +
    +	@Override
    +	public void setConfigurationFilePath(Path confPath) {
    +		flinkConfigurationPath = confPath;
    +	}
    +
    +	public void setConfigurationDirectory(String configurationDirectory) {
    +		this.configurationDirectory = configurationDirectory;
    +	}
    +
    +	@Override
    +	public void setFlinkLoggingConfigurationPath(Path logConfPath) {
    +		flinkLoggingConfigurationPath = logConfPath;
    +	}
    +
    +	@Override
    +	public Path getFlinkLoggingConfigurationPath() {
    +		return flinkLoggingConfigurationPath;
    +	}
    +
    +	@Override
    +	public void setTaskManagerCount(int tmCount) {
    +		if(tmCount < 1) {
    +			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
    +		}
    +		this.taskManagerCount = tmCount;
    +	}
    +
    +	@Override
    +	public int getTaskManagerCount() {
    +		return this.taskManagerCount;
    +	}
    +
    +	@Override
    +	public void setShipFiles(List<File> shipFiles) {
    +		for(File shipFile: shipFiles) {
    +			// remove uberjar from ship list (by default everything in the lib/ folder is added to
    +			// the list of files to ship, but we handle the uberjar separately.
    +			if(!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
    +				this.shipFiles.add(shipFile);
    +			}
    +		}
    +	}
    +
    +	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
    +		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
    +	}
    +
    +	@Override
    +	public String getDynamicPropertiesEncoded() {
    +		return this.dynamicPropertiesEncoded;
    +	}
    +
    +
    +	public void isReadyForDeployment() throws YarnDeploymentException {
    +		if(taskManagerCount <= 0) {
    +			throw new YarnDeploymentException("Taskmanager count must be positive");
    +		}
    +		if(this.flinkJarPath == null) {
    +			throw new YarnDeploymentException("The Flink jar path is null");
    +		}
    +		if(this.configurationDirectory == null) {
    +			throw new YarnDeploymentException("Configuration directory not set");
    +		}
    +		if(this.flinkConfigurationPath == null) {
    +			throw new YarnDeploymentException("Configuration path not set");
    +		}
    +		if(this.flinkConfiguration == null) {
    +			throw new YarnDeploymentException("Flink configuration object has not been set");
    +		}
    +
    +		// check if required Hadoop environment variables are set. If not, warn user
    +		if(System.getenv("HADOOP_CONF_DIR") == null &&
    +			System.getenv("YARN_CONF_DIR") == null) {
    +			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
    +				"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
    +				"configuration for accessing YARN.");
    +		}
    +	}
    +
    +	public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
    +		for(int i = 0; i < nodeManagers.length; i++) {
    +			if(nodeManagers[i] >= toAllocate) {
    +				nodeManagers[i] -= toAllocate;
    +				return true;
    +			}
    +		}
    +		return false;
    +	}
    +
    +	@Override
    +	public void setDetachedMode(boolean detachedMode) {
    +		this.detached = detachedMode;
    +	}
    +
    +	@Override
    +	public boolean isDetached() {
    +		return detached;
    +	}
    +
    +	public AbstractFlinkYarnCluster deploy() throws Exception {
    +
    +		UserGroupInformation.setConfiguration(conf);
    +		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    +
    +		if (UserGroupInformation.isSecurityEnabled()) {
    +			if (!ugi.hasKerberosCredentials()) {
    +				throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
    +					"You may use kinit to authenticate and request a TGT from the Kerberos server.");
    +			}
    +			return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
    +				@Override
    +				public AbstractFlinkYarnCluster run() throws Exception {
    +					return deployInternal();
    +				}
    +			});
    +		} else {
    +			return deployInternal();
    +		}
    +	}
    +
    +
    +
    +	/**
    +	 * This method will block until the ApplicationMaster/JobManager have been
    +	 * deployed on YARN.
    +	 */
    +	protected AbstractFlinkYarnCluster deployInternal() throws Exception {
    +		isReadyForDeployment();
    +
    +		LOG.info("Using values:");
    +		LOG.info("\tTaskManager count = {}", taskManagerCount);
    +		LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
    +		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
    +
    +		// Create application via yarnClient
    +		yarnApplication = yarnClient.createApplication();
    +		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    +
    +		// ------------------ Add dynamic properties to local flinkConfiguraton ------
    +
    +		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
    +		for (Tuple2<String, String> dynProperty : dynProperties) {
    +			flinkConfiguration.setString(dynProperty.f0, dynProperty.f1);
    +		}
    +
    +		// ------------------ Check if the specified queue exists --------------
    +
    +		try {
    +			List<QueueInfo> queues = yarnClient.getAllQueues();
    +			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
    +				boolean queueFound = false;
    +				for (QueueInfo queue : queues) {
    +					if (queue.getQueueName().equals(this.yarnQueue)) {
    +						queueFound = true;
    +						break;
    +					}
    +				}
    +				if (!queueFound) {
    +					String queueNames = "";
    +					for (QueueInfo queue : queues) {
    +						queueNames += queue.getQueueName() + ", ";
    +					}
    +					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
    +						"Available queues: " + queueNames);
    +				}
    +			} else {
    +				LOG.debug("The YARN cluster does not have any queues configured");
    +			}
    +		} catch(Throwable e) {
    +			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
    +			if(LOG.isDebugEnabled()) {
    +				LOG.debug("Error details", e);
    +			}
    +		}
    +
    +		// ------------------ Check if the YARN Cluster has the requested resources --------------
    +
    +		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
    +		// all allocations below this value are automatically set to this value.
    +		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
    +		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
    +			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
    +				+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
    +				"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
    +				"you requested will start.");
    +		}
    +
    +		// set the memory to minAllocationMB to do the next checks correctly
    +		if(jobManagerMemoryMb < yarnMinAllocationMB) {
    +			jobManagerMemoryMb =  yarnMinAllocationMB;
    +		}
    +		if(taskManagerMemoryMb < yarnMinAllocationMB) {
    +			taskManagerMemoryMb =  yarnMinAllocationMB;
    +		}
    +
    +		Resource maxRes = appResponse.getMaximumResourceCapability();
    +		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    +		if(jobManagerMemoryMb > maxRes.getMemory() ) {
    +			failSessionDuringDeployment();
    +			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    +				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
    +		}
    +
    +		if(taskManagerMemoryMb > maxRes.getMemory() ) {
    +			failSessionDuringDeployment();
    +			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    +				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
    +		}
    +
    +		final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
    +			"connecting from the beginning because the resources are currently not available in the cluster. " +
    +			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
    +			"the resources become available.";
    +		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    +		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    +		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    +				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
    +
    +		}
    +		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    +				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
    +		}
    +		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    +				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
    +		}
    +
    +		// ----------------- check if the requested containers fit into the cluster.
    +
    +		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		// first, allocate the jobManager somewhere.
    +		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
    +			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
    +				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    +				Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
    +		}
    +		// allocate TaskManagers
    +		for(int i = 0; i < taskManagerCount; i++) {
    +			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
    +				LOG.warn("There is not enough memory available in the YARN cluster. " +
    +					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    +					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
    +					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + NOTE_RSC );
    +			}
    +		}
    +
    +		// ------------------ Prepare Application Master Container  ------------------------------
    +
    +		// respect custom JVM options in the YAML file
    +		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
    +
    +		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
    +		boolean hasLogback = new File(logbackFile).exists();
    +		String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
    +
    +		boolean hasLog4j = new File(log4jFile).exists();
    +		if(hasLogback) {
    +			shipFiles.add(new File(logbackFile));
    +		}
    +		if(hasLog4j) {
    +			shipFiles.add(new File(log4jFile));
    +		}
    +
    +		// Set up the container launch context for the application master
    +		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
    +
    +		String amCommand = "$JAVA_HOME/bin/java"
    +			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
    +
    +		if(hasLogback || hasLog4j) {
    +			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
    +
    +			if(hasLogback) {
    +				amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
    +			}
    +
    +			if(hasLog4j) {
    +				amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
    +			}
    +		}
    +
    +		amCommand 	+= " " + getApplicationMasterClass().getName() + " "
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1213#issuecomment-145456411
  
    I was just curious whether he killed them gracefully with a `PoisonPill` or via killing the JVM process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1213#discussion_r41374549
  
    --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.testingUtils
    +
    +import akka.actor.{Terminated, ActorRef}
    +import org.apache.flink.runtime.FlinkActor
    +import org.apache.flink.runtime.execution.ExecutionState
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
    +import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
    +RequestLeaderSessionID}
    +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
    +import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
    +AcknowledgeRegistration}
    +import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
    +import org.apache.flink.runtime.taskmanager.TaskManager
    +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
    +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
    +CheckIfJobRemoved, Alive}
    +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
    +
    +import scala.concurrent.duration._
    +
    +import language.postfixOps
    +
    +/** This mixin can be used to decorate a TaskManager with messages for testing purposes.
    +  *
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---