You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2014/10/11 20:40:39 UTC

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/incubator-flink/pull/149

    [FLINK-1019] Implementation of akka based RPC system

    Replaced the old Nephele RPC service with akka based system. Thus, several components are now implemented as actors. This includes the JobManager, TaskManager, MemoryArchivist, JobClient. The legacy RPC service and the corresponding protocols are removed. 
    
    Replaced also the execution service of the ExecutionGraph by akka's futures to unify the system. 
    
    Removed the LocalInstanceManager whose task is now handled by the InstanceManager. The responsibility to create local task managers is now delegated to the FlinkMiniCluster. 
    
    The EventCollector was removed and the respective event classes. The events are now directly sent to the respective listeners. 
    
    Moved the resources of the WebInfoServer and the WebInterfaceServer to the resource folders of the corresponding projects. As a consequence these resources are bundled with the jars and directly served from them by Jetty. 
    
    The yarn client was adapted to communicate with the actors. The former ApplicationMaster is combined with the JobManager to simplify the system. The uber-jar is now created with maven's shading plugin.
    
    Since this is a big change I would be happy if another pair of eyes could take a look at it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/incubator-flink akka_scala

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/149.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #149
    
----
commit 98d15ca6380263c980d34038f9772f49abb4a5ba
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-03T12:53:21Z

    Added configuration constants for akka's actor system.

commit 68f096e8017b5bb5b345d20b26fcba5103b2a451
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-04T09:32:59Z

    Reworked the NepheleMiniCluster logic
    
    added further communication logic to jobmanager and taskmanager.

commit 963709a9c6194bd54cd2dd5429643c8c91cce8a7
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-05T09:17:16Z

    Reworked the EventCollector and the ArchiveListener as actors.
    
    Replacing the EventCollector and the MemoryArchivist.
    
    Finished EventCollector and adjusted ExecutionGraph and ExecutionVertex to register actors as listeners.

commit da838966c8f34ef4f90dd0422cf65c90275b88aa
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-11T15:47:28Z

    Reworked the TaskManager.
    
    Changed RuntimeEnvironment and TaskInputSplitProvider to work with ActorRefs.

commit 509a7770ae9c13528d12f9d2be44541ad085a0af
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-12T12:48:56Z

    Implemented JobManagerProfiler and TaskManagerProfiler as actors.
    
    Renamed NepheleMiniCluster to FlinkMiniCluster.

commit ab86da20b0fa6f7e2559dd6e641b80a172641beb
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-12T15:46:02Z

    Removed proxis from JobClient and CliFrontend.

commit 1cf8230c6ccf6371fe4e15785672e1ede91d6313
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-15T12:07:42Z

    Removed old RPC service.

commit 545c5ff0eb7eceeb850a930e8e3db9908bb8ec26
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-18T15:09:31Z

    Ported the JobManagerITCase to actor implementation.

commit d70f71a24ba36d79e78355815629afef6abde1f0
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-19T12:48:04Z

    Updated all JobManager and TaskManager relevant test cases to work with actors.

commit 2e7bff1f4309b83bf31cb27072a24f925038222b
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-22T14:14:03Z

    Removed legacy protocol classes.

commit 19ac6da337ed952ccd0a88a30942b962b1b3791b
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-23T15:28:39Z

    Updated license headers of scala files to conform to scala-checkstyle.

commit b984abf455becf31d221af94bf3a1144696cda4b
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-09-26T15:27:35Z

    Removed execution service from execution graph and replaced by akka's futures.

commit a3e630249579deeba397641c70f19a6ad1c85262
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-02T09:19:52Z

    Replaced the JobClient by an actor.

commit 3823d091e70dc59ce9c86ab81b145d7a20a9df3c
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-05T20:58:49Z

    Removed old java implementations of the JobManager, TaskManager, JobClient, EventCollector, TaskOperationResult and MemoryArchivist.

commit 6da8db9542346a6bb89d68ac2e7ba74c364fe06a
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-06T12:45:37Z

    Adapted Webserver to work with actors.

commit aa49195bbe7d292b00a2fffa7bcdf6e668fb1227
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-06T13:55:11Z

    Added the webserver to the job manager.

commit 749a0a8537cc86420bd8b1172f9fbbb9048e1dc9
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-07T15:00:56Z

    Removed old events. Adapted webserver communication so that it can now talk to the actor implementations.

commit 538e1aa4c5fd04f4f2e7f9d98bcd94b9989bc4f0
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-08T10:10:25Z

    Added AKKA_LOG_LEVEL config constant.

commit 6ad2e36a409a48d7c6a7ef0b71b6409d011a8a89
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-08T12:47:43Z

    Added scala docs to actor messages.

commit f4d940d749731bd6999df44b5494c701e2a3d985
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-08T13:48:15Z

    Removed listen parameter from SubmitJobDetached message.

commit 5b1b33c5af5ab4608d7c6d0fcc592ac80da646e9
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-09T17:24:39Z

    Reworked Yarn client: Actor based communication.

commit 027829d7d17549c2421d2e0db9fd4b08ddb5751b
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-10T13:50:27Z

    Moved WebInfoServer and WebInterfaceServer resources into resource folders of respective projects. Jetty uses the jar as base directory.

commit 7bd5b1a34c7507dfcc136a54ffdb9234f806f0a7
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-10T23:08:42Z

    Implemented proper shutdown of yarn containers if the system is shutdown via the yarn client.

commit d8e307cbf710c60406b270a08b102624c5145643
Author: Till Rohrmann <tr...@apache.org>
Date:   2014-10-11T18:37:59Z

    Resolved scala checkstyle issues.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-62362286
  
    What are the plans for merging this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-67422487
  
    Thanks for the feedback Ufuk. I tried to address the points you have mentioned. Concerning the problem of failing travis builds, it turned out to be race condition in the execution graph I stumbled upon. Due to this race condition, it was possible for a job to finish before all vertices have properly reached a finished state. It is fixed with the latest commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22015946
  
    --- Diff: .travis.yml ---
    @@ -1,7 +1,15 @@
     # s3 deployment based on http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/
     
    -language: java
    +# send to container based infrastructure: http://docs.travis-ci.com/user/workers/container-based-infrastructure/
    +sudo: false
    --- End diff --
    
    This shall actually only enable the container based travis builds. I hope that his has no effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-62364769
  
    There are some minor issues we discovered when going over the code with Stephan. I first have to address them. These include amongst others:
    
    * Local mode for JobManager in main
    * Better error message when LibraryCacheManager fails (or BlobManager)
    * GlobalExecutionContext limit amount of created threads?
    * TaskManager: Watch jobManager in case of network error or jobManager crash
    * Check exception handling in ExecutionGraph
    * Expose Akka timeouts via config 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-64166751
  
    Yes it is. But there are still some performance issues I have to figure out first. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22016004
  
    --- Diff: flink-addons/flink-avro/src/test/resources/log4j-test.properties ---
    @@ -16,4 +16,12 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF
    \ No newline at end of file
    +# Set root logger level to DEBUG and its only appender to A1.
    +log4j.rootLogger=OFF, A1
    --- End diff --
    
    Yeah we were kind of lazy by only inserting an empty file or with the rootlogger set to OFF.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22015969
  
    --- Diff: .travis.yml ---
    @@ -44,12 +52,9 @@ before_script:
        - "gem install --no-document --version 0.8.9 faraday "
        - "gem install --no-document travis-artifacts & "
     
    -
    -install: true
    -
     # we have to manually set the forkCount because maven thinks that the travis
     # machine has 32 cores
    -script: "mvn -Dflink.forkCount=4 -B $PROFILE clean install verify"
    +script: "mvn -Dflink.forkCount=2 -B $PROFILE clean install verify"
    --- End diff --
    
    The problem was that with higher forks travis killed the JVMs because they ran out of resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r21962682
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---
    @@ -18,29 +18,38 @@
     
     package org.apache.flink.client;
     
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
     import static org.junit.Assert.fail;
     
    -import java.io.IOException;
    -import java.util.ArrayList;
    -import java.util.List;
    -
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +import akka.actor.Status;
    +import akka.actor.UntypedActor;
    +import akka.testkit.JavaTestKit;
     import org.apache.commons.cli.CommandLine;
    -import org.apache.flink.client.CliFrontend;
    -import org.apache.flink.runtime.client.JobCancelResult;
    -import org.apache.flink.runtime.client.JobProgressResult;
    -import org.apache.flink.runtime.client.JobSubmissionResult;
    -import org.apache.flink.runtime.event.job.AbstractEvent;
    -import org.apache.flink.runtime.event.job.RecentJobEvent;
    -import org.apache.flink.runtime.jobgraph.JobGraph;
     import org.apache.flink.runtime.jobgraph.JobID;
    -import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
    -import org.apache.flink.runtime.types.IntegerRecord;
    -import org.junit.Assert;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.junit.AfterClass;
     import org.junit.BeforeClass;
     import org.junit.Test;
     
    +//TODO: Update test case
    --- End diff --
    
    Is this TODO still valid?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r21970914
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---
    @@ -18,29 +18,38 @@
     
     package org.apache.flink.client;
     
    +import static org.junit.Assert.assertEquals;
     import static org.junit.Assert.assertTrue;
     import static org.junit.Assert.fail;
     
    -import java.io.IOException;
    -import java.util.ArrayList;
    -import java.util.List;
    -
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +import akka.actor.Status;
    +import akka.actor.UntypedActor;
    +import akka.testkit.JavaTestKit;
     import org.apache.commons.cli.CommandLine;
    -import org.apache.flink.client.CliFrontend;
    -import org.apache.flink.runtime.client.JobCancelResult;
    -import org.apache.flink.runtime.client.JobProgressResult;
    -import org.apache.flink.runtime.client.JobSubmissionResult;
    -import org.apache.flink.runtime.event.job.AbstractEvent;
    -import org.apache.flink.runtime.event.job.RecentJobEvent;
    -import org.apache.flink.runtime.jobgraph.JobGraph;
     import org.apache.flink.runtime.jobgraph.JobID;
    -import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
    -import org.apache.flink.runtime.types.IntegerRecord;
    -import org.junit.Assert;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.junit.AfterClass;
     import org.junit.BeforeClass;
     import org.junit.Test;
     
    +//TODO: Update test case
    --- End diff --
    
    Good catch. I forgot to delete it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r21395124
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -0,0 +1,518 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager
    +
    +import java.io.File
    +import java.net.{InetSocketAddress}
    +import java.util.concurrent.TimeUnit
    +
    +import akka.actor._
    +import akka.pattern.Patterns
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
    +import org.apache.flink.core.io.InputSplitAssigner
    +import org.apache.flink.runtime.blob.BlobServer
    +import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
    +import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
    +import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.taskmanager.TaskManager
    +import org.apache.flink.runtime.{JobException, ActorLogMessages}
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
    +import org.apache.flink.runtime.instance.{InstanceManager}
    +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
    +import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
    +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
    +import org.apache.flink.runtime.messages.JobManagerMessages._
    +import org.apache.flink.runtime.messages.RegistrationMessages._
    +import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, Heartbeat}
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.WrapAsScala
    +import scala.concurrent.{Future}
    +import scala.concurrent.duration._
    +
    +class JobManager(val configuration: Configuration) extends
    +Actor with ActorLogMessages with ActorLogging with WrapAsScala {
    +  import context._
    +  implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
    +    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
    +
    +  Execution.timeout = timeout;
    +
    +  log.info("Starting job manager.")
    +
    +  val (archiveCount,
    +    profiling,
    +    cleanupInterval,
    +    defaultExecutionRetries,
    +    delayBetweenRetries) = JobManager.parseConfiguration(configuration)
    +
    +  // Props for the profiler actor
    +  def profilerProps: Props = Props(classOf[JobManagerProfiler])
    +
    +  // Props for the archive actor
    +  def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
    +
    +  val profiler = profiling match {
    +    case true => Some(context.actorOf(profilerProps, JobManager.PROFILER_NAME))
    +    case false => None
    +  }
    +
    +  val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
    +
    +  val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
    +  val instanceManager = new InstanceManager()
    +  val scheduler = new FlinkScheduler()
    +  val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
    +
    +  // List of current jobs running
    +  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
    +
    +  // Map of actors which want to be notified once a specific job terminates
    +  val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  instanceManager.addInstanceListener(scheduler)
    +
    +  log.info(s"Started job manager. Waiting for incoming messages.")
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping job manager ${self.path}.")
    +    instanceManager.shutdown()
    +    scheduler.shutdown()
    +    libraryCacheManager.shutdown()
    +  }
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => {
    +      val taskManager = sender()
    +      val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
    +        hardwareInformation, numberOfSlots)
    +
    +      // to be notified when the taskManager is no longer reachable
    +//      context.watch(taskManager);
    +
    +      taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
    +    }
    +
    +    case RequestNumberRegisteredTaskManager => {
    +      sender() ! instanceManager.getNumberOfRegisteredTaskManagers
    +    }
    +
    +    case RequestTotalNumberOfSlots => {
    +      sender() ! instanceManager.getTotalNumberOfSlots
    +    }
    +
    +    case SubmitJob(jobGraph, listenToEvents, detach) => {
    +      try {
    +        if (jobGraph == null) {
    +          sender() ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" +
    +            " null."))
    +        } else {
    +
    +          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
    +
    +          // Create the user code class loader
    +          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
    +
    +          val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
    +            (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
    +              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(),
    +              System.currentTimeMillis())))
    +
    +          val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
    +            jobGraph.getNumberOfExecutionRetries
    +          }else{
    +            defaultExecutionRetries
    +          }
    +
    +          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
    +          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
    +
    +          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
    +
    +          if (userCodeLoader == null) {
    +            throw new JobException("The user code class loader could not be initialized.")
    +          }
    +
    +          log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
    +            .getName}).")
    +
    +          for (vertex <- jobGraph.getVertices) {
    +            val executableClass = vertex.getInvokableClassName
    +            if (executableClass == null || executableClass.length == 0) {
    +              throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
    +                s"invokable class.")
    +            }
    +
    +            vertex.initializeOnMaster(userCodeLoader)
    +          }
    +
    +          // topological sorting of the job vertices
    +          val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
    +
    +          log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
    +            .getJobID} (${jobGraph.getName}).")
    +
    +          executionGraph.attachJobGraph(sortedTopology)
    +
    +          log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
    +            s"(${jobGraph.getName}).")
    +
    +          executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
    +
    +          // get notified about job status changes
    +          executionGraph.registerJobStatusListener(self)
    +
    +          if(listenToEvents){
    +            // the sender will be notified about state changes
    +            executionGraph.registerExecutionListener(sender())
    +            executionGraph.registerJobStatusListener(sender())
    +          }
    +
    +          jobInfo.detach = detach
    +
    +          log.info(s"Scheduling job ${jobGraph.getName}.")
    +
    +          executionGraph.scheduleForExecution(scheduler)
    +
    +          sender() ! SubmissionSuccess(jobGraph.getJobID)
    +        }
    +      } catch {
    +        case t: Throwable =>
    +          log.error(t, "Job submission failed.")
    +
    +          currentJobs.get(jobGraph.getJobID) match {
    +            case Some((executionGraph, jobInfo)) =>
    +              executionGraph.fail(t)
    +
    +              // don't send the client the final job status because we already send him
    +              // SubmissionFailure
    +              jobInfo.detach = true
    +
    +              val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID), 10 second)
    +              status.onFailure{
    +                case _: Throwable => self ! JobStatusChanged(executionGraph.getJobID,
    +                  JobStatus.FAILED, System.currentTimeMillis(),
    +                  s"Cleanup job ${jobGraph.getJobID}.")
    +              }
    +            case None =>
    +              libraryCacheManager.unregisterJob(jobGraph.getJobID)
    +              currentJobs.remove(jobGraph.getJobID)
    +
    +          }
    +
    +          sender() ! SubmissionFailure(jobGraph.getJobID, t)
    +      }
    +    }
    +
    +    case CancelJob(jobID) => {
    +      log.info(s"Trying to cancel job with ID ${jobID}.")
    +
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, _)) =>
    +          Future {
    +            executionGraph.cancel()
    +          }
    +          sender() ! CancellationSuccess(jobID)
    +        case None =>
    +          log.info(s"No job found with ID ${jobID}.")
    +          sender() ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " +
    +            s"ID ${jobID}."))
    +      }
    +    }
    +
    +    case UpdateTaskExecutionState(taskExecutionState) => {
    +      if(taskExecutionState == null){
    +        sender() ! false
    +      }else {
    +        currentJobs.get(taskExecutionState.getJobID) match {
    +          case Some((executionGraph, _)) =>
    +            sender() ! executionGraph.updateState(taskExecutionState)
    +          case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
    +            .getJobID} to change state to ${taskExecutionState.getExecutionState}.")
    +            sender() ! false
    +        }
    +      }
    +    }
    +
    +    case RequestNextInputSplit(jobID, vertexID) => {
    +      val nextInputSplit = currentJobs.get(jobID) match {
    +        case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match {
    +          case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
    +            case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
    --- End diff --
    
    When passing a `null` hostname here, input split localization is impossible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-58857878
  
    I vote to merge this rather soon:
     - The pull request touches A LOT of code, so it will quickly become incompatible with the rest of the system.
     - We are going to merge this anyways. The only think we need to ensure here is that the change does not immediately break our system. Minor bugfixes will come anyways in the next few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-67417589
  
    Looks like a big ass change ;-) My inline comments were more or less random. I just wanted to get a feeling of the changes.
    
    I can have a look in the next days. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22013663
  
    --- Diff: .travis.yml ---
    @@ -44,12 +52,9 @@ before_script:
        - "gem install --no-document --version 0.8.9 faraday "
        - "gem install --no-document travis-artifacts & "
     
    -
    -install: true
    -
     # we have to manually set the forkCount because maven thinks that the travis
     # machine has 32 cores
    -script: "mvn -Dflink.forkCount=4 -B $PROFILE clean install verify"
    +script: "mvn -Dflink.forkCount=2 -B $PROFILE clean install verify"
    --- End diff --
    
    Did you have better experience with less forks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22013639
  
    --- Diff: .travis.yml ---
    @@ -1,7 +1,15 @@
     # s3 deployment based on http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/
     
    -language: java
    +# send to container based infrastructure: http://docs.travis-ci.com/user/workers/container-based-infrastructure/
    +sudo: false
    --- End diff --
    
    From the offline chat we had with about a weird problem when testing on Travis: probably just a stupid idea, but do you have the same problems with the "normal" sudo builds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22014136
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -0,0 +1,714 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager
    +
    +import java.io.{IOException, File}
    +import java.lang.management.{GarbageCollectorMXBean, MemoryMXBean, ManagementFactory}
    +import java.net.{InetAddress, InetSocketAddress}
    +import java.util
    +import java.util.concurrent.{FutureTask, TimeUnit}
    +
    +import akka.actor._
    +import akka.pattern.ask
    +import org.apache.flink.api.common.cache.DistributedCache
    +import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, Configuration}
    +import org.apache.flink.core.fs.Path
    +import org.apache.flink.runtime.ActorLogMessages
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.blob.BlobCache
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager
    +import org.apache.flink.runtime.execution.{ExecutionState, RuntimeEnvironment}
    +import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
    +FallbackLibraryCacheManager, LibraryCacheManager}
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
    +import org.apache.flink.runtime.filecache.FileCache
    +import org.apache.flink.runtime.instance.{InstanceConnectionInfo, HardwareDescription, InstanceID}
    +import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
    +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
    +import org.apache.flink.runtime.io.network.{NetworkConnectionManager, LocalConnectionManager,
    +ChannelManager}
    +import org.apache.flink.runtime.jobgraph.JobID
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
    +import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
    +import org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager,
    +AcknowledgeRegistration}
    +import org.apache.flink.runtime.messages.TaskManagerMessages._
    +import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask, MonitorTask,
    +RegisterProfilingListener}
    +import org.apache.flink.runtime.net.NetUtils
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.apache.flink.runtime.util.EnvironmentInformation
    +import org.apache.flink.util.ExceptionUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.{WrapAsScala, DecorateAsScala}
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +import scala.util.Failure
    +import scala.util.Success
    +
    +class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
    +                  val taskManagerConfig: TaskManagerConfiguration,
    +                  val networkConnectionConfig: NetworkConnectionConfiguration)
    +  extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
    +
    +  import context._
    +  import taskManagerConfig.{timeout => tmTimeout, _}
    +  implicit val timeout = tmTimeout
    +
    +  log.info(s"Starting task manager at ${self.path}.")
    +
    +  val REGISTRATION_DELAY = 0 seconds
    +  val REGISTRATION_INTERVAL = 10 seconds
    +  val MAX_REGISTRATION_ATTEMPTS = 10
    +  val HEARTBEAT_INTERVAL = 5000 millisecond
    +
    +  TaskManager.checkTempDirs(tmpDirPaths)
    +  val ioManager = new IOManagerAsync(tmpDirPaths)
    +  val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
    +  val bcVarManager = new BroadcastVariableManager();
    +  val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
    +  val fileCache = new FileCache()
    +  val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
    +
    +  // Actors which want to be notified once this task manager has been registered at the job manager
    +  val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
    +
    +  val profiler = profilingInterval match {
    +    case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat,
    +      interval))
    +    case None => None
    +  }
    +
    +  var libraryCacheManager: LibraryCacheManager = null
    +  var channelManager: Option[ChannelManager] = None
    +  var registrationScheduler: Option[Cancellable] = None
    +  var registrationAttempts: Int = 0
    +  var registered: Boolean = false
    +  var currentJobManager = ActorRef.noSender
    +  var instanceID: InstanceID = null;
    +  var memoryMXBean: Option[MemoryMXBean] = None
    +  var gcMXBeans: Option[Iterable[GarbageCollectorMXBean]] = None
    +  var heartbeatScheduler: Option[Cancellable] = None
    +
    +  if (log.isDebugEnabled) {
    +    memoryLogggingIntervalMs.foreach {
    +      interval =>
    +        val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
    +        memoryMXBean = Some(ManagementFactory.getMemoryMXBean)
    +        gcMXBeans = Some(ManagementFactory.getGarbageCollectorMXBeans.asScala)
    +
    +        context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
    +    }
    +  }
    +
    +  override def preStart(): Unit = {
    +    tryJobManagerRegistration()
    +  }
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping task manager ${self.path}.")
    +
    +    cancelAndClearEverything(new Exception("Task Manager is shutting down."))
    +
    +    heartbeatScheduler foreach {
    +      _.cancel()
    +    }
    +
    +    channelManager foreach {
    +      channelManager =>
    +        try {
    +          channelManager.shutdown()
    +        } catch {
    +          case t: Throwable =>
    +            log.error(t, "ChannelManager did not shutdown properly.")
    +        }
    +    }
    +
    +    ioManager.shutdown()
    +    memoryManager.shutdown()
    +    fileCache.shutdown()
    +
    +    if(libraryCacheManager != null){
    +      libraryCacheManager.shutdown()
    +    }
    +  }
    +
    +  def tryJobManagerRegistration(): Unit = {
    +    registrationAttempts = 0
    +    import context.dispatcher
    +    registrationScheduler = Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
    +      REGISTRATION_INTERVAL, self, RegisterAtJobManager))
    +  }
    +
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterAtJobManager => {
    +      registrationAttempts += 1
    +
    +      if (registered) {
    +        registrationScheduler.foreach(_.cancel())
    +      } else if (registrationAttempts <= MAX_REGISTRATION_ATTEMPTS) {
    +
    +        log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " +
    +          s"Attempt")
    +        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +        jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
    +      } else {
    +        log.error("TaskManager could not register at JobManager.");
    +        self ! PoisonPill
    +      }
    +    }
    +
    +    case AcknowledgeRegistration(id, blobPort) => {
    +      if (!registered) {
    +        registered = true
    +        currentJobManager = sender
    +        instanceID = id
    +
    +        context.watch(currentJobManager)
    +
    +        log.info(s"TaskManager successfully registered at JobManager ${
    +          currentJobManager.path
    +            .toString
    +        }.")
    +
    +        setupChannelManager()
    +        setupLibraryCacheManager(blobPort)
    +
    +        heartbeatScheduler = Some(context.system.scheduler.schedule(HEARTBEAT_INTERVAL,
    +          HEARTBEAT_INTERVAL, self,
    +          SendHeartbeat))
    +
    +        profiler foreach {
    +          _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
    +        }
    +
    +        for (listener <- waitForRegistration) {
    +          listener ! RegisteredAtJobManager
    +        }
    +
    +        waitForRegistration.clear()
    +      }
    +    }
    +
    +    case CancelTask(executionID) => {
    +      runningTasks.get(executionID) match {
    +        case Some(task) =>
    +          Future {
    +            task.cancelExecution()
    +          }
    +          sender ! new TaskOperationResult(executionID, true)
    +        case None =>
    +          sender ! new TaskOperationResult(executionID, false, "No task with that execution ID " +
    +            "was " +
    +            "found.")
    +      }
    +    }
    +
    +    case SubmitTask(tdd) => {
    +      val jobID = tdd.getJobID
    +      val vertexID = tdd.getVertexID
    +      val executionID = tdd.getExecutionId
    +      val taskIndex = tdd.getIndexInSubtaskGroup
    +      val numSubtasks = tdd.getCurrentNumberOfSubtasks
    +      var jarsRegistered = false
    +      var startRegisteringTask = 0L
    +
    +      try {
    +        if(log.isDebugEnabled){
    +          startRegisteringTask = System.currentTimeMillis()
    +        }
    +        libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
    +
    +        if(log.isDebugEnabled){
    +          log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
    +            startRegisteringTask)/1000.0}s")
    +        }
    +        jarsRegistered = true
    +
    +        val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
    +
    +        if (userCodeClassLoader == null) {
    +          throw new RuntimeException("No user code Classloader available.")
    +        }
    +
    +        val task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
    +          tdd.getTaskName, this)
    +
    +        runningTasks.put(executionID, task) match {
    +          case Some(_) => throw new RuntimeException(s"TaskManager contains already a task with " +
    +            s"executionID ${executionID}.")
    +          case None =>
    +        }
    +
    +        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
    +          executionID, timeout)
    +        val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
    +          ioManager, splitProvider, currentJobManager, bcVarManager)
    +
    +        task.setEnvironment(env)
    +
    +        // register the task with the network stack and profilers
    +        channelManager match {
    +          case Some(cm) => cm.register(task)
    +          case None => throw new RuntimeException("ChannelManager has not been properly " +
    +            "instantiated.")
    +        }
    +
    +        val jobConfig = tdd.getJobConfiguration
    +
    +        if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
    +          profiler match {
    +            case Some(profiler) => profiler ! MonitorTask(task)
    +            case None => log.info("There is no profiling enabled for the task manager.")
    +          }
    +        }
    +
    +        val cpTasks = new util.HashMap[String, FutureTask[Path]]()
    +
    +        for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
    +          val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
    +          cpTasks.put(entry.getKey, cp)
    +        }
    +        env.addCopyTasksForCacheFile(cpTasks)
    +
    +        if (!task.startExecution()) {
    +          throw new RuntimeException("Cannot start task. Task was canceled or failed.")
    +        }
    +
    +        sender ! TaskOperationResult(executionID, true)
    +      } catch {
    +        case t: Throwable =>
    --- End diff --
    
    I think you forgot to unregister the task from the channel manager again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22013789
  
    --- Diff: flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---
    @@ -116,7 +117,9 @@ public void open(int taskNumber, int numTasks) throws IOException {
     		} catch (Exception e) {
     			throw new RuntimeException(e);
     		}
    -		
    +
    +		System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" +
    --- End diff --
    
    These are just debug outputs, right? Just to make sure :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22016075
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---
    @@ -0,0 +1,714 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager
    +
    +import java.io.{IOException, File}
    +import java.lang.management.{GarbageCollectorMXBean, MemoryMXBean, ManagementFactory}
    +import java.net.{InetAddress, InetSocketAddress}
    +import java.util
    +import java.util.concurrent.{FutureTask, TimeUnit}
    +
    +import akka.actor._
    +import akka.pattern.ask
    +import org.apache.flink.api.common.cache.DistributedCache
    +import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, Configuration}
    +import org.apache.flink.core.fs.Path
    +import org.apache.flink.runtime.ActorLogMessages
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.blob.BlobCache
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager
    +import org.apache.flink.runtime.execution.{ExecutionState, RuntimeEnvironment}
    +import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
    +FallbackLibraryCacheManager, LibraryCacheManager}
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
    +import org.apache.flink.runtime.filecache.FileCache
    +import org.apache.flink.runtime.instance.{InstanceConnectionInfo, HardwareDescription, InstanceID}
    +import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
    +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
    +import org.apache.flink.runtime.io.network.{NetworkConnectionManager, LocalConnectionManager,
    +ChannelManager}
    +import org.apache.flink.runtime.jobgraph.JobID
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
    +import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
    +import org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager,
    +AcknowledgeRegistration}
    +import org.apache.flink.runtime.messages.TaskManagerMessages._
    +import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask, MonitorTask,
    +RegisterProfilingListener}
    +import org.apache.flink.runtime.net.NetUtils
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.apache.flink.runtime.util.EnvironmentInformation
    +import org.apache.flink.util.ExceptionUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.{WrapAsScala, DecorateAsScala}
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +import scala.util.Failure
    +import scala.util.Success
    +
    +class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
    +                  val taskManagerConfig: TaskManagerConfiguration,
    +                  val networkConnectionConfig: NetworkConnectionConfiguration)
    +  extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
    +
    +  import context._
    +  import taskManagerConfig.{timeout => tmTimeout, _}
    +  implicit val timeout = tmTimeout
    +
    +  log.info(s"Starting task manager at ${self.path}.")
    +
    +  val REGISTRATION_DELAY = 0 seconds
    +  val REGISTRATION_INTERVAL = 10 seconds
    +  val MAX_REGISTRATION_ATTEMPTS = 10
    +  val HEARTBEAT_INTERVAL = 5000 millisecond
    +
    +  TaskManager.checkTempDirs(tmpDirPaths)
    +  val ioManager = new IOManagerAsync(tmpDirPaths)
    +  val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
    +  val bcVarManager = new BroadcastVariableManager();
    +  val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
    +  val fileCache = new FileCache()
    +  val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
    +
    +  // Actors which want to be notified once this task manager has been registered at the job manager
    +  val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
    +
    +  val profiler = profilingInterval match {
    +    case Some(interval) => Some(TaskManager.startProfiler(self.path.toSerializationFormat,
    +      interval))
    +    case None => None
    +  }
    +
    +  var libraryCacheManager: LibraryCacheManager = null
    +  var channelManager: Option[ChannelManager] = None
    +  var registrationScheduler: Option[Cancellable] = None
    +  var registrationAttempts: Int = 0
    +  var registered: Boolean = false
    +  var currentJobManager = ActorRef.noSender
    +  var instanceID: InstanceID = null;
    +  var memoryMXBean: Option[MemoryMXBean] = None
    +  var gcMXBeans: Option[Iterable[GarbageCollectorMXBean]] = None
    +  var heartbeatScheduler: Option[Cancellable] = None
    +
    +  if (log.isDebugEnabled) {
    +    memoryLogggingIntervalMs.foreach {
    +      interval =>
    +        val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
    +        memoryMXBean = Some(ManagementFactory.getMemoryMXBean)
    +        gcMXBeans = Some(ManagementFactory.getGarbageCollectorMXBeans.asScala)
    +
    +        context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
    +    }
    +  }
    +
    +  override def preStart(): Unit = {
    +    tryJobManagerRegistration()
    +  }
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping task manager ${self.path}.")
    +
    +    cancelAndClearEverything(new Exception("Task Manager is shutting down."))
    +
    +    heartbeatScheduler foreach {
    +      _.cancel()
    +    }
    +
    +    channelManager foreach {
    +      channelManager =>
    +        try {
    +          channelManager.shutdown()
    +        } catch {
    +          case t: Throwable =>
    +            log.error(t, "ChannelManager did not shutdown properly.")
    +        }
    +    }
    +
    +    ioManager.shutdown()
    +    memoryManager.shutdown()
    +    fileCache.shutdown()
    +
    +    if(libraryCacheManager != null){
    +      libraryCacheManager.shutdown()
    +    }
    +  }
    +
    +  def tryJobManagerRegistration(): Unit = {
    +    registrationAttempts = 0
    +    import context.dispatcher
    +    registrationScheduler = Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
    +      REGISTRATION_INTERVAL, self, RegisterAtJobManager))
    +  }
    +
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterAtJobManager => {
    +      registrationAttempts += 1
    +
    +      if (registered) {
    +        registrationScheduler.foreach(_.cancel())
    +      } else if (registrationAttempts <= MAX_REGISTRATION_ATTEMPTS) {
    +
    +        log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " +
    +          s"Attempt")
    +        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +        jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
    +      } else {
    +        log.error("TaskManager could not register at JobManager.");
    +        self ! PoisonPill
    +      }
    +    }
    +
    +    case AcknowledgeRegistration(id, blobPort) => {
    +      if (!registered) {
    +        registered = true
    +        currentJobManager = sender
    +        instanceID = id
    +
    +        context.watch(currentJobManager)
    +
    +        log.info(s"TaskManager successfully registered at JobManager ${
    +          currentJobManager.path
    +            .toString
    +        }.")
    +
    +        setupChannelManager()
    +        setupLibraryCacheManager(blobPort)
    +
    +        heartbeatScheduler = Some(context.system.scheduler.schedule(HEARTBEAT_INTERVAL,
    +          HEARTBEAT_INTERVAL, self,
    +          SendHeartbeat))
    +
    +        profiler foreach {
    +          _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
    +        }
    +
    +        for (listener <- waitForRegistration) {
    +          listener ! RegisteredAtJobManager
    +        }
    +
    +        waitForRegistration.clear()
    +      }
    +    }
    +
    +    case CancelTask(executionID) => {
    +      runningTasks.get(executionID) match {
    +        case Some(task) =>
    +          Future {
    +            task.cancelExecution()
    +          }
    +          sender ! new TaskOperationResult(executionID, true)
    +        case None =>
    +          sender ! new TaskOperationResult(executionID, false, "No task with that execution ID " +
    +            "was " +
    +            "found.")
    +      }
    +    }
    +
    +    case SubmitTask(tdd) => {
    +      val jobID = tdd.getJobID
    +      val vertexID = tdd.getVertexID
    +      val executionID = tdd.getExecutionId
    +      val taskIndex = tdd.getIndexInSubtaskGroup
    +      val numSubtasks = tdd.getCurrentNumberOfSubtasks
    +      var jarsRegistered = false
    +      var startRegisteringTask = 0L
    +
    +      try {
    +        if(log.isDebugEnabled){
    +          startRegisteringTask = System.currentTimeMillis()
    +        }
    +        libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
    +
    +        if(log.isDebugEnabled){
    +          log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
    +            startRegisteringTask)/1000.0}s")
    +        }
    +        jarsRegistered = true
    +
    +        val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
    +
    +        if (userCodeClassLoader == null) {
    +          throw new RuntimeException("No user code Classloader available.")
    +        }
    +
    +        val task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
    +          tdd.getTaskName, this)
    +
    +        runningTasks.put(executionID, task) match {
    +          case Some(_) => throw new RuntimeException(s"TaskManager contains already a task with " +
    +            s"executionID ${executionID}.")
    +          case None =>
    +        }
    +
    +        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
    +          executionID, timeout)
    +        val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
    +          ioManager, splitProvider, currentJobManager, bcVarManager)
    +
    +        task.setEnvironment(env)
    +
    +        // register the task with the network stack and profilers
    +        channelManager match {
    +          case Some(cm) => cm.register(task)
    +          case None => throw new RuntimeException("ChannelManager has not been properly " +
    +            "instantiated.")
    +        }
    +
    +        val jobConfig = tdd.getJobConfiguration
    +
    +        if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
    +          profiler match {
    +            case Some(profiler) => profiler ! MonitorTask(task)
    +            case None => log.info("There is no profiling enabled for the task manager.")
    +          }
    +        }
    +
    +        val cpTasks = new util.HashMap[String, FutureTask[Path]]()
    +
    +        for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
    +          val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
    +          cpTasks.put(entry.getKey, cp)
    +        }
    +        env.addCopyTasksForCacheFile(cpTasks)
    +
    +        if (!task.startExecution()) {
    +          throw new RuntimeException("Cannot start task. Task was canceled or failed.")
    +        }
    +
    +        sender ! TaskOperationResult(executionID, true)
    +      } catch {
    +        case t: Throwable =>
    --- End diff --
    
    Really good point Ufuk. Thanks for spotting the resource leak. I'll fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22013711
  
    --- Diff: flink-addons/flink-avro/src/test/resources/log4j-test.properties ---
    @@ -16,4 +16,12 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF
    \ No newline at end of file
    +# Set root logger level to DEBUG and its only appender to A1.
    +log4j.rootLogger=OFF, A1
    --- End diff --
    
    I think it's a good idea to add this default layout and appender :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r21440673
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---
    @@ -0,0 +1,518 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager
    +
    +import java.io.File
    +import java.net.{InetSocketAddress}
    +import java.util.concurrent.TimeUnit
    +
    +import akka.actor._
    +import akka.pattern.Patterns
    +import akka.pattern.{ask, pipe}
    +import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
    +import org.apache.flink.core.io.InputSplitAssigner
    +import org.apache.flink.runtime.blob.BlobServer
    +import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
    +import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
    +import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
    +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
    +import org.apache.flink.runtime.taskmanager.TaskManager
    +import org.apache.flink.runtime.{JobException, ActorLogMessages}
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
    +import org.apache.flink.runtime.instance.{InstanceManager}
    +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
    +import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
    +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
    +import org.apache.flink.runtime.messages.JobManagerMessages._
    +import org.apache.flink.runtime.messages.RegistrationMessages._
    +import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, Heartbeat}
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.WrapAsScala
    +import scala.concurrent.{Future}
    +import scala.concurrent.duration._
    +
    +class JobManager(val configuration: Configuration) extends
    +Actor with ActorLogMessages with ActorLogging with WrapAsScala {
    +  import context._
    +  implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
    +    ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
    +
    +  Execution.timeout = timeout;
    +
    +  log.info("Starting job manager.")
    +
    +  val (archiveCount,
    +    profiling,
    +    cleanupInterval,
    +    defaultExecutionRetries,
    +    delayBetweenRetries) = JobManager.parseConfiguration(configuration)
    +
    +  // Props for the profiler actor
    +  def profilerProps: Props = Props(classOf[JobManagerProfiler])
    +
    +  // Props for the archive actor
    +  def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
    +
    +  val profiler = profiling match {
    +    case true => Some(context.actorOf(profilerProps, JobManager.PROFILER_NAME))
    +    case false => None
    +  }
    +
    +  val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
    +
    +  val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
    +  val instanceManager = new InstanceManager()
    +  val scheduler = new FlinkScheduler()
    +  val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
    +
    +  // List of current jobs running
    +  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
    +
    +  // Map of actors which want to be notified once a specific job terminates
    +  val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
    +
    +  instanceManager.addInstanceListener(scheduler)
    +
    +  log.info(s"Started job manager. Waiting for incoming messages.")
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping job manager ${self.path}.")
    +    instanceManager.shutdown()
    +    scheduler.shutdown()
    +    libraryCacheManager.shutdown()
    +  }
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => {
    +      val taskManager = sender()
    +      val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
    +        hardwareInformation, numberOfSlots)
    +
    +      // to be notified when the taskManager is no longer reachable
    +//      context.watch(taskManager);
    +
    +      taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
    +    }
    +
    +    case RequestNumberRegisteredTaskManager => {
    +      sender() ! instanceManager.getNumberOfRegisteredTaskManagers
    +    }
    +
    +    case RequestTotalNumberOfSlots => {
    +      sender() ! instanceManager.getTotalNumberOfSlots
    +    }
    +
    +    case SubmitJob(jobGraph, listenToEvents, detach) => {
    +      try {
    +        if (jobGraph == null) {
    +          sender() ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" +
    +            " null."))
    +        } else {
    +
    +          log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}}).")
    +
    +          // Create the user code class loader
    +          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys)
    +
    +          val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(),
    +            (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
    +              jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys), JobInfo(sender(),
    +              System.currentTimeMillis())))
    +
    +          val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries >= 0){
    +            jobGraph.getNumberOfExecutionRetries
    +          }else{
    +            defaultExecutionRetries
    +          }
    +
    +          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
    +          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
    +
    +          val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
    +
    +          if (userCodeLoader == null) {
    +            throw new JobException("The user code class loader could not be initialized.")
    +          }
    +
    +          log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${jobGraph
    +            .getName}).")
    +
    +          for (vertex <- jobGraph.getVertices) {
    +            val executableClass = vertex.getInvokableClassName
    +            if (executableClass == null || executableClass.length == 0) {
    +              throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
    +                s"invokable class.")
    +            }
    +
    +            vertex.initializeOnMaster(userCodeLoader)
    +          }
    +
    +          // topological sorting of the job vertices
    +          val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources
    +
    +          log.debug(s"Adding ${sortedTopology.size()} vertices from job graph ${jobGraph
    +            .getJobID} (${jobGraph.getName}).")
    +
    +          executionGraph.attachJobGraph(sortedTopology)
    +
    +          log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
    +            s"(${jobGraph.getName}).")
    +
    +          executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
    +
    +          // get notified about job status changes
    +          executionGraph.registerJobStatusListener(self)
    +
    +          if(listenToEvents){
    +            // the sender will be notified about state changes
    +            executionGraph.registerExecutionListener(sender())
    +            executionGraph.registerJobStatusListener(sender())
    +          }
    +
    +          jobInfo.detach = detach
    +
    +          log.info(s"Scheduling job ${jobGraph.getName}.")
    +
    +          executionGraph.scheduleForExecution(scheduler)
    +
    +          sender() ! SubmissionSuccess(jobGraph.getJobID)
    +        }
    +      } catch {
    +        case t: Throwable =>
    +          log.error(t, "Job submission failed.")
    +
    +          currentJobs.get(jobGraph.getJobID) match {
    +            case Some((executionGraph, jobInfo)) =>
    +              executionGraph.fail(t)
    +
    +              // don't send the client the final job status because we already send him
    +              // SubmissionFailure
    +              jobInfo.detach = true
    +
    +              val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID), 10 second)
    +              status.onFailure{
    +                case _: Throwable => self ! JobStatusChanged(executionGraph.getJobID,
    +                  JobStatus.FAILED, System.currentTimeMillis(),
    +                  s"Cleanup job ${jobGraph.getJobID}.")
    +              }
    +            case None =>
    +              libraryCacheManager.unregisterJob(jobGraph.getJobID)
    +              currentJobs.remove(jobGraph.getJobID)
    +
    +          }
    +
    +          sender() ! SubmissionFailure(jobGraph.getJobID, t)
    +      }
    +    }
    +
    +    case CancelJob(jobID) => {
    +      log.info(s"Trying to cancel job with ID ${jobID}.")
    +
    +      currentJobs.get(jobID) match {
    +        case Some((executionGraph, _)) =>
    +          Future {
    +            executionGraph.cancel()
    +          }
    +          sender() ! CancellationSuccess(jobID)
    +        case None =>
    +          log.info(s"No job found with ID ${jobID}.")
    +          sender() ! CancellationFailure(jobID, new IllegalArgumentException(s"No job found with " +
    +            s"ID ${jobID}."))
    +      }
    +    }
    +
    +    case UpdateTaskExecutionState(taskExecutionState) => {
    +      if(taskExecutionState == null){
    +        sender() ! false
    +      }else {
    +        currentJobs.get(taskExecutionState.getJobID) match {
    +          case Some((executionGraph, _)) =>
    +            sender() ! executionGraph.updateState(taskExecutionState)
    +          case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
    +            .getJobID} to change state to ${taskExecutionState.getExecutionState}.")
    +            sender() ! false
    +        }
    +      }
    +    }
    +
    +    case RequestNextInputSplit(jobID, vertexID) => {
    +      val nextInputSplit = currentJobs.get(jobID) match {
    +        case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match {
    +          case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
    +            case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
    --- End diff --
    
    Good point, I probably just copied the old bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r20710194
  
    --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.minicluster
    +
    +import java.util.concurrent.TimeUnit
    +
    +import akka.pattern.ask
    +import akka.actor.{ActorRef, ActorSystem}
    +import org.apache.flink.configuration.{ConfigConstants, Configuration}
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
    +import org.slf4j.LoggerFactory
    +
    +import scala.concurrent.duration.FiniteDuration
    +import scala.concurrent.{Future, Await}
    +
    +abstract class FlinkMiniCluster(userConfiguration: Configuration) {
    --- End diff --
    
    The user config argument is a great idea. I was checking this, because I need something similar for my changes. I will add a similar thing to my branch for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by hsaputra <gi...@git.apache.org>.
Github user hsaputra commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-64070161
  
    Look like rebase is needed for this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/149#issuecomment-67423845
  
    It looks like it was really hard to catch. :-) Congrats! I don't know how much work it is, but we might want to make sure to have a test case for this (if possible) or add an extra comment that no one messes with the patch. ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1019] Implementation of akka ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/149


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---