You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/01 13:33:20 UTC

[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

    [ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455396#comment-15455396 ] 

ASF GitHub Bot commented on FLINK-4455:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77173923
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
     
    -	private final InstanceConnectionInfo connectionInfo;
    +	private boolean isShutdown;
    --- End diff --
    
    I would make this `volatile`. Good practice for such status flags. Makes the class by itself pretty safe (members are all final or volatile).


> Replace ActorGateways in NetworkEnvironment by interfaces
> ---------------------------------------------------------
>
>                 Key: FLINK-4455
>                 URL: https://issues.apache.org/jira/browse/FLINK-4455
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network, TaskManager
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt Flip-6) I propose to replace the {{ActorGateways}} by interfaces which exposes the required methods. The current implementation would then simply wrap the method calls in messages and send them via the {{ActorGateway}} to the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)