You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/08/31 23:48:17 UTC

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2449

    [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of ActorGateway and JobManager association

    Makes the NetworkEnvironment independent of the JobManager association. This means that the
    NetworkEnvironment and with it the ConnectionManager is started before the TaskManager actor
    is executed. Furthermore, the ConnectionManager keeps running even in case of a JobManager
    disassocation. In the wake of the remodelling this behaviour, the PartitionStateChecker and
    the ResultPartitionConsumableNotifier which depend on the JobManager association were moved
    out of the NetworkEnvironment. They are now contained in the SlotEnvironment which will be
    set up when the TaskManager connects to a JobManager. The SlotEnvironment contains all
    information related to the associated JobManager. Since all slots are implicitly associated
    with the JobManager which is the leader, we only create one SlotEnvironment which is shared
    by all Tasks.
    
    Introduce SlotEnvironment to accommodate the PartitionStateChecker and ResultPartitionConsumableNotifier
    
    Remove the PartitionStateChecker and the ResultPartitionConsumableNotifier from the
    NetworkEnvironment. Start the NetworkEnvironment when the TaskManager components are
    created. Keep the NetworkEnvironment running also when the JobManager is disassociated.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink FLINK-4455

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2449.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2449
    
----
commit 96b1772d4581365eeec6614827b36f76a83ba5d0
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-08-31T07:33:46Z

    [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of ActorGateway and JobManager association
    
    Makes the NetworkEnvironment independent of the JobManager association. This means that the
    NetworkEnvironment and with it the ConnectionManager is started before the TaskManager actor
    is executed. Furthermore, the ConnectionManager keeps running even in case of a JobManager
    disassocation. In the wake of the remodelling this behaviour, the PartitionStateChecker and
    the ResultPartitionConsumableNotifier which depend on the JobManager association were moved
    out of the NetworkEnvironment. They are now contained in the SlotEnvironment which will be
    set up when the TaskManager connects to a JobManager. The SlotEnvironment contains all
    information related to the associated JobManager. Since all slots are implicitly associated
    with the JobManager which is the leader, we only create one SlotEnvironment which is shared
    by all Tasks.
    
    Introduce SlotEnvironment to accommodate the PartitionStateChecker and ResultPartitionConsumableNotifier
    
    Remove the PartitionStateChecker and the ResultPartitionConsumableNotifier from the
    NetworkEnvironment. Start the NetworkEnvironment when the TaskManager components are
    created. Keep the NetworkEnvironment running also when the JobManager is disassociated.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    Thanks for the quick and thorough review @StephanEwen.
    
    - Concerning the `ExecutionContext` in the `TaskManager`: This is simply the `ExecutionContext` which was passed before to the `NetworkEnvironment` but now is no longer needed there. I agree that we should be careful with thread pool creations. At the moment, we create for each `JobManager` and `TaskManager` an additional execution context. The advantage of this approach is that we separate the actor execution from the execution of other components. Thus, it's not possible that another component starves the execution context of the actor and thus influences its responsiveness. But I agree that the few additional future callbacks shouldn't be a big burden for the `TaskManager` actor system. Will change it.
    
    - It is true that there is no notion of slots on the `TaskManager` right now. I've named it `SlotEnvironment` because with the Flip-6 refactorings we need to maintain multiple job manager connections and every task runs in a slot which is associated with a JobManager. Renaming it to `JobManagerConnection` should better reflect what it actually is.
    
    - Yes that is a good idea. Will try to attach the `ResultPartitionConsumableNotifier` to the `Task`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    Looks good to me, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77194470
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
    --- End diff --
    
    True will refactor it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77195218
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
     
    -	private final InstanceConnectionInfo connectionInfo;
    +	private boolean isShutdown;
    --- End diff --
    
    That's a good point. I'll actually synchronize on `lock` because all other accesses to `isShutdown` happen also under `lock`. This eliminate the case where `isShutdown` returns false but some of the `NetworkEnvironment's` components are already shut down. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    I've addressed your comments @StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    +1 for moving the `PartitionStateChecker` and `ResultPartitionConsumableNotifier` out of the `NetworkEnvironment`.
    
    Few questions and comments:
    
      - Do we need an extra ExecutorService in the TaskManager? I have been digging through a bunch of thread dumps over time and there are already many threads and pools already. I would really like to avoid having yet another Thread pool (creating thread pools should be an extremely careful decision).
    
      The Akka thread pool executor is quite over-provisioned for the few actors we actually use. I think it is perfectly feasible to use that one for the few extra futures introduced here. In any case, if not reusing the Akka executor pool, then the thread pool needs to be shut down in the TaskManager runner. Otherwise it creates a leak when running successive local Flink jobs.
    
      - I am a bit consumed about the `SlotEnvironment`. Maybe it is mainly the name, but what does it have to do with the slots? Is it not more like a network-messages specific *JobManager Connection*?
      
      - The `ResultPartitionConsumableNotifier` could be per `Task` - that way, future multi-JobManager assiciations would work seamlessly and it could directly call `fail(...)` on the Task without having to go through the `TaskManager`. It could probably leave the TaskManager out of the picture completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77174082
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -476,6 +252,29 @@ public void unregisterTask(Task task) {
     		}
     	}
     
    +	public void start() throws IOException {
    +		synchronized (lock) {
    +			LOG.info("Starting the network environment and its components.");
    --- End diff --
    
    Check for `isShutdown` here to be safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    Build has passed on Travis with local repo. Will merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77194531
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -476,6 +252,29 @@ public void unregisterTask(Task task) {
     		}
     	}
     
    +	public void start() throws IOException {
    +		synchronized (lock) {
    +			LOG.info("Starting the network environment and its components.");
    --- End diff --
    
    Yes that's true. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2449


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77173923
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
     
    -	private final InstanceConnectionInfo connectionInfo;
    +	private boolean isShutdown;
    --- End diff --
    
    I would make this `volatile`. Good practice for such status flags. Makes the class by itself pretty safe (members are all final or volatile).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77175883
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
    --- End diff --
    
    This should probably be two `int`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2449
  
    Tests have passed locally. Will rebase check again on Travis and then merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---