You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2016/09/01 13:32:29 UTC

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77173923
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The network environment contains
      * the data structures that keep track of all intermediate results and all data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network buffer pool.
    - * All other components (netty, intermediate result managers, ...) are only created once the
    - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
     	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
     
     	private final Object lock = new Object();
     
    -	private final NetworkEnvironmentConfiguration configuration;
    -
    -	private final FiniteDuration jobManagerTimeout;
    -
     	private final NetworkBufferPool networkBufferPool;
     
    -	private ConnectionManager connectionManager;
    +	private final ConnectionManager connectionManager;
     
    -	private ResultPartitionManager partitionManager;
    +	private final ResultPartitionManager resultPartitionManager;
     
    -	private TaskEventDispatcher taskEventDispatcher;
    -
    -	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -	private PartitionStateChecker partitionStateChecker;
    +	private final TaskEventDispatcher taskEventDispatcher;
     
     	/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
    -	private KvStateServer kvStateServer;
    +	private final KvStateServer kvStateServer;
     
     	/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
    -	private KvStateRegistry kvStateRegistry;
    +	private final KvStateRegistry kvStateRegistry;
     
    -	private boolean isShutdown;
    +	private final IOMode defaultIOMode;
     
    -	/**
    -	 * ExecutionEnvironment which is used to execute remote calls with the
    -	 * {@link JobManagerResultPartitionConsumableNotifier}
    -	 */
    -	private final ExecutionContext executionContext;
    +	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
     
    -	private final InstanceConnectionInfo connectionInfo;
    +	private boolean isShutdown;
    --- End diff --
    
    I would make this `volatile`. Good practice for such status flags. Makes the class by itself pretty safe (members are all final or volatile).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---