You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/09/01 14:55:58 UTC

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2456

    [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvironment

    Replaces the `ActorGateway` in `Task` and `RuntimeEnvironment` by interfaces to decouple these components from the actors.
    
    - Introduces a `TaskExecutionStateListener` interface for `TaskExecutionState` update messages
    - Replaces the job manager `ActorGateway` by `InputSplitProvider` and `CheckpointNotifier`
    - Replaces the task manager `ActorGateway` by `TaskManagerConnection`
    
    The implementations using the `ActorGateways` are
    
    - `InputSplitProvider` --> `TaskInputSplitProvider`
    - `TaskExecutionStateListener` --> `ActorGatewayTaskExecutionStateListener`
    - `CheckpointNotifier` --> `ActorGatewayCheckpointNotifier`
    - `TaskManagerConnection` --> `ActorGatewayTaskManagerConnection`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink FLINK-4456

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2456
    
----
commit 578b2ef0bacc57f093150e4addd56b833ebbdf05
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-01T12:41:44Z

    [FLINK-4456] Introduce TaskExecutionStateListener for Task

commit bd85b45e61160e5d86578da1e52c60ef1bde7c10
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-01T13:50:36Z

    Replace JobManagerGateway in Task by InputSplitProvider and CheckpointNotifier

commit d665583ca03e6748187ada46ce30c39704b17fd8
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-01T14:36:31Z

    Replace the TaskManager ActorGateway by TaskManagerConnection in Task

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2456
  
    I will rebase the PR on the latest master and if Travis gives green light, I'd like to merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2456


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77508870
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java ---
    @@ -35,4 +35,16 @@
     	 *         task shall not consume any further input splits.
     	 */
     	InputSplit getNextInputSplit();
    +
    +	/**
    +	 * Starts the input split provider with a user code class loader.
    +	 *
    +	 * @param userCodeClassLoader User code class loader to use by the input split provider
    +	 */
    +	void start(ClassLoader userCodeClassLoader);
    --- End diff --
    
    The `start` method's intention is to pass in the `userCodeClassLoader` which is currently created in `Task#run` method. There are two other ways to solve the problem. Either creating the user code class loader outside of `Task` where the `InputSplitProvider` is created or to pass the user code class loader via the `getNextInputSplit` method call to the input split provider.
    
    For the first approach: Creating the user code class loader is a blocking operation, so this would have to executed in a future and upon completion we could create the `Task` instance in the `TaskManager`.
    
    
    For the second approach: We would have to touch more code but I think everywhere were the `getNextInputSplit` method is called, we have access to the user code class loader.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2456
  
    I've addressed the comments. In order to solve the problem with the user code class loader, I decided to pass it into the `InputSplitProvider` via the `getNextInputSplit` method. That way, we don't have to know the user code class loader when creating the `InputSplitProvider` implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77491885
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.state.ChainedStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +
    +import java.util.List;
    +
    +/**
    + * Notifier for checkpoint acknowledge and decline messages in the {@link Task}.
    + */
    +public interface CheckpointNotifier {
    --- End diff --
    
    `CheckpointNotifier` is already used for user functions, as an interface to get notifications on completed checkpoints.
    
    How about calling this `CheckpointResponder` or something like that instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77492066
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager;
    +
    +public interface TaskExecutionStateListener {
    +
    +	/**
    +	 * Called whenever the task's execution state changes
    +	 *
    +	 * @param taskExecutionState describing the task execution state change
    +	 */
    +	void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
    --- End diff --
    
    `notifyTaskExecutionStateChanged(...)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2456
  
    I think this is good.
    Few comments inline on the names, the hardest problem usually ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77492004
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java ---
    @@ -35,4 +35,16 @@
     	 *         task shall not consume any further input splits.
     	 */
     	InputSplit getNextInputSplit();
    +
    +	/**
    +	 * Starts the input split provider with a user code class loader.
    +	 *
    +	 * @param userCodeClassLoader User code class loader to use by the input split provider
    +	 */
    +	void start(ClassLoader userCodeClassLoader);
    --- End diff --
    
    Is it possible to have the `start()` and `stop()` methods not in the base interface, but only in the Akka specific implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77508917
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager;
    +
    +public interface TaskExecutionStateListener {
    +
    +	/**
    +	 * Called whenever the task's execution state changes
    +	 *
    +	 * @param taskExecutionState describing the task execution state change
    +	 */
    +	void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
    --- End diff --
    
    Yes, can rename.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2456#discussion_r77508238
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.state.ChainedStateHandle;
    +import org.apache.flink.runtime.state.KeyGroupsStateHandle;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +
    +import java.util.List;
    +
    +/**
    + * Notifier for checkpoint acknowledge and decline messages in the {@link Task}.
    + */
    +public interface CheckpointNotifier {
    --- End diff --
    
    Oh I've overlooked this class. Will rename it into `CheckpointResponder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---