You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:30 UTC

[25/50] [abbrv] flink git commit: [hotfix] [taskmanager] Fixes TaskManager component creation at startup

[hotfix] [taskmanager] Fixes TaskManager component creation at startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a17c106c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a17c106c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a17c106c

Branch: refs/heads/flip-6
Commit: a17c106c756ab13540148cb264d96e094c0449b0
Parents: 64e0205
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 8 18:43:15 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:44 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 189 ++++++++++++++++---
 .../taskexecutor/TaskExecutorConfiguration.java |   9 -
 2 files changed, 159 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a17c106c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 735730b..a455fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,9 +19,19 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import akka.actor.ActorSystem;
-import akka.dispatch.ExecutionContexts$;
 import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +47,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -61,7 +70,6 @@ import org.apache.flink.util.NetUtils;
 import scala.Tuple2;
 import scala.Option;
 import scala.Some;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -70,9 +78,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -86,6 +94,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The unique resource ID of this TaskExecutor */
 	private final ResourceID resourceID;
 
+	private final TaskManagerLocation taskManagerLocation;
+
 	/** The access to the leader election and metadata storage services */
 	private final HighAvailabilityServices haServices;
 
@@ -113,22 +123,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	public TaskExecutor(
 			TaskExecutorConfiguration taskExecutorConfig,
 			ResourceID resourceID,
+			TaskManagerLocation taskManagerLocation,
 			MemoryManager memoryManager,
 			IOManager ioManager,
 			NetworkEnvironment networkEnvironment,
-			int numberOfSlots,
 			RpcService rpcService,
 			HighAvailabilityServices haServices) {
 
 		super(rpcService);
 
+		checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
+
 		this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
 		this.resourceID = checkNotNull(resourceID);
+		this.taskManagerLocation = checkNotNull(taskManagerLocation);
 		this.memoryManager = checkNotNull(memoryManager);
 		this.ioManager = checkNotNull(ioManager);
 		this.networkEnvironment = checkNotNull(networkEnvironment);
-		this.numberOfSlots = checkNotNull(numberOfSlots);
 		this.haServices = checkNotNull(haServices);
+
+		this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
 	}
 
 	// ------------------------------------------------------------------------
@@ -360,10 +374,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 *                                      then a HighAvailabilityServices is constructed from the configuration.
 	 * @param localTaskManagerCommunication     If true, the TaskManager will not initiate the TCP network stack.
 	 * @return An ActorRef to the TaskManager actor.
-	 * @throws org.apache.flink.configuration.IllegalConfigurationException     Thrown, if the given config contains illegal values.
-	 * @throws java.io.IOException      Thrown, if any of the I/O components (such as buffer pools,
+	 * @throws IllegalConfigurationException     Thrown, if the given config contains illegal values.
+	 * @throws IOException      Thrown, if any of the I/O components (such as buffer pools,
 	 *                                       I/O manager, ...) cannot be properly started.
-	 * @throws java.lang.Exception      Thrown is some other error occurs while parsing the configuration
+	 * @throws Exception      Thrown is some other error occurs while parsing the configuration
 	 *                                      or starting the TaskManager components.
 	 */
 	public static TaskExecutor startTaskManagerComponentsAndActor(
@@ -377,19 +391,105 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
 			configuration, taskManagerHostname, localTaskManagerCommunication);
 
+		TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
+			resourceID,
+			InetAddress.getByName(taskManagerHostname),
+			taskExecutorConfig,
+			configuration);
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			taskExecutorConfig,
+			resourceID,
+			taskManagerComponents.getTaskManagerLocation(),
+			taskManagerComponents.getMemoryManager(),
+			taskManagerComponents.getIOManager(),
+			taskManagerComponents.getNetworkEnvironment(),
+			rpcService,
+			haServices);
+
+		return taskExecutor;
+	}
+
+	/**
+	 * Creates and returns the task manager components.
+	 *
+	 * @param resourceID resource ID of the task manager
+	 * @param taskManagerAddress address of the task manager
+	 * @param taskExecutorConfig task manager configuration
+	 * @param configuration of Flink
+	 * @return task manager components
+	 * @throws Exception
+	 */
+	private static TaskExecutor.TaskManagerComponents createTaskManagerComponents(
+		ResourceID resourceID,
+		InetAddress taskManagerAddress,
+		TaskExecutorConfiguration taskExecutorConfig,
+		Configuration configuration) throws Exception {
 		MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
 
 		// pre-start checks
 		checkTempDirs(taskExecutorConfig.getTmpDirPaths());
 
-		ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+		NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
+
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+			networkEnvironmentConfiguration.numNetworkBuffers(),
+			networkEnvironmentConfiguration.networkBufferSize(),
+			networkEnvironmentConfiguration.memoryType());
+
+		ConnectionManager connectionManager;
+
+		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+			connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
+		} else {
+			connectionManager = new LocalConnectionManager();
+		}
+
+		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+		KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+		KvStateServer kvStateServer;
+
+		if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+			NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
+
+			int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+			int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+				nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+			kvStateServer = new KvStateServer(
+				taskManagerAddress,
+				networkEnvironmentConfiguration.queryServerPort(),
+				numNetworkThreads,
+				numQueryThreads,
+				kvStateRegistry,
+				new DisabledKvStateRequestStats());
+		} else {
+			kvStateServer = null;
+		}
 
 		// we start the network first, to make sure it can allocate its buffers first
 		final NetworkEnvironment network = new NetworkEnvironment(
-			executionContext,
-			taskExecutorConfig.getTimeout(),
-			taskExecutorConfig.getNetworkConfig(),
-			taskExecutorConfig.getConnectionInfo());
+			networkBufferPool,
+			connectionManager,
+			resultPartitionManager,
+			taskEventDispatcher,
+			kvStateRegistry,
+			kvStateServer,
+			networkEnvironmentConfiguration.ioMode(),
+			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+			networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+		network.start();
+
+		TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+			resourceID,
+			taskManagerAddress,
+			network.getConnectionManager().getDataPort());
 
 		// computing the amount of memory to use depends on how much memory is available
 		// it strictly needs to happen AFTER the network stack has been initialized
@@ -473,17 +573,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
 
-		final TaskExecutor taskExecutor = new TaskExecutor(
-			taskExecutorConfig,
-			resourceID,
-			memoryManager,
-			ioManager,
-			network,
-			taskExecutorConfig.getNumberOfSlots(),
-			rpcService,
-			haServices);
-
-		return taskExecutor;
+		return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
 	}
 
 	// --------------------------------------------------------------------------
@@ -519,7 +609,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
 
 		InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
-		final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport);
+		final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
 
 		// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
 
@@ -576,7 +666,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		final NettyConfig nettyConfig;
 		if (!localTaskManagerCommunication) {
-			nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration);
+			nettyConfig = new NettyConfig(
+				taskManagerInetSocketAddress.getAddress(),
+				taskManagerInetSocketAddress.getPort(),
+				pageSize,
+				slots,
+				configuration);
 		} else {
 			nettyConfig = null;
 		}
@@ -613,8 +708,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			queryServerPort,
 			queryServerNetworkThreads,
 			queryServerQueryThreads,
-			localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig),
-			new Tuple2<>(500, 3000));
+			Option.apply(nettyConfig),
+			500,
+			30000);
 
 		// ----> timeouts, library caching, profiling
 
@@ -695,7 +791,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return new TaskExecutorConfiguration(
 			tmpDirs,
 			cleanupInterval,
-			connectionInfo,
 			networkConfig,
 			timeout,
 			finiteRegistrationDuration,
@@ -829,4 +924,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			onFatalErrorAsync(exception);
 		}
 	}
+
+	private static class TaskManagerComponents {
+		private final TaskManagerLocation taskManagerLocation;
+		private final MemoryManager memoryManager;
+		private final IOManager ioManager;
+		private final NetworkEnvironment networkEnvironment;
+
+		private TaskManagerComponents(
+				TaskManagerLocation taskManagerLocation,
+				MemoryManager memoryManager,
+				IOManager ioManager,
+				NetworkEnvironment networkEnvironment) {
+			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+			this.memoryManager = Preconditions.checkNotNull(memoryManager);
+			this.ioManager = Preconditions.checkNotNull(ioManager);
+			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+		}
+
+		public MemoryManager getMemoryManager() {
+			return memoryManager;
+		}
+
+		public IOManager getIOManager() {
+			return ioManager;
+		}
+
+		public NetworkEnvironment getNetworkEnvironment() {
+			return networkEnvironment;
+		}
+
+		public TaskManagerLocation getTaskManagerLocation() {
+			return taskManagerLocation;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a17c106c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
index 3707a47..c97c893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -52,12 +51,9 @@ public class TaskExecutorConfiguration implements Serializable {
 
 	private final NetworkEnvironmentConfiguration networkConfig;
 
-	private final InstanceConnectionInfo connectionInfo;
-
 	public TaskExecutorConfiguration(
 			String[] tmpDirPaths,
 			long cleanupInterval,
-			InstanceConnectionInfo connectionInfo,
 			NetworkEnvironmentConfiguration networkConfig,
 			FiniteDuration timeout,
 			FiniteDuration maxRegistrationDuration,
@@ -66,7 +62,6 @@ public class TaskExecutorConfiguration implements Serializable {
 
 		this (tmpDirPaths,
 			cleanupInterval,
-			connectionInfo,
 			networkConfig,
 			timeout,
 			maxRegistrationDuration,
@@ -80,7 +75,6 @@ public class TaskExecutorConfiguration implements Serializable {
 	public TaskExecutorConfiguration(
 			String[] tmpDirPaths,
 			long cleanupInterval,
-			InstanceConnectionInfo connectionInfo,
 			NetworkEnvironmentConfiguration networkConfig,
 			FiniteDuration timeout,
 			FiniteDuration maxRegistrationDuration,
@@ -92,7 +86,6 @@ public class TaskExecutorConfiguration implements Serializable {
 
 		this.tmpDirPaths = checkNotNull(tmpDirPaths);
 		this.cleanupInterval = checkNotNull(cleanupInterval);
-		this.connectionInfo = checkNotNull(connectionInfo);
 		this.networkConfig = checkNotNull(networkConfig);
 		this.timeout = checkNotNull(timeout);
 		this.maxRegistrationDuration = maxRegistrationDuration;
@@ -115,8 +108,6 @@ public class TaskExecutorConfiguration implements Serializable {
 		return cleanupInterval;
 	}
 
-	public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; }
-
 	public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
 
 	public FiniteDuration getTimeout() {