You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/01 13:33:20 UTC
[jira] [Commented] (FLINK-4455) Replace ActorGateways in
NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455396#comment-15455396 ]
ASF GitHub Bot commented on FLINK-4455:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2449#discussion_r77173923
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
@@ -18,130 +18,88 @@
package org.apache.flink.runtime.io.network;
-import akka.dispatch.OnFailure;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
-import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Network I/O components of each {@link TaskManager} instance. The network environment contains
* the data structures that keep track of all intermediate results and all data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network buffer pool.
- * All other components (netty, intermediate result managers, ...) are only created once the
- * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
*/
public class NetworkEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
private final Object lock = new Object();
- private final NetworkEnvironmentConfiguration configuration;
-
- private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
- private ConnectionManager connectionManager;
+ private final ConnectionManager connectionManager;
- private ResultPartitionManager partitionManager;
+ private final ResultPartitionManager resultPartitionManager;
- private TaskEventDispatcher taskEventDispatcher;
-
- private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
- private PartitionStateChecker partitionStateChecker;
+ private final TaskEventDispatcher taskEventDispatcher;
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
- private KvStateServer kvStateServer;
+ private final KvStateServer kvStateServer;
/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
- private KvStateRegistry kvStateRegistry;
+ private final KvStateRegistry kvStateRegistry;
- private boolean isShutdown;
+ private final IOMode defaultIOMode;
- /**
- * ExecutionEnvironment which is used to execute remote calls with the
- * {@link JobManagerResultPartitionConsumableNotifier}
- */
- private final ExecutionContext executionContext;
+ private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
- private final InstanceConnectionInfo connectionInfo;
+ private boolean isShutdown;
--- End diff --
I would make this `volatile`. Good practice for such status flags. Makes the class by itself pretty safe (members are all final or volatile).
> Replace ActorGateways in NetworkEnvironment by interfaces
> ---------------------------------------------------------
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
> Issue Type: Improvement
> Components: Network, TaskManager
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the dependency on actors.
> In terms of modularization and an improved abstraction (especially wrt Flip-6) I propose to replace the {{ActorGateways}} by interfaces which exposes the required methods. The current implementation would then simply wrap the method calls in messages and send them via the {{ActorGateway}} to the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement these interfaces as part of their RPC contract.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)