You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:00 UTC
[18/50] [abbrv] flink git commit: [hotfix] [taskmanager] Fixes
TaskManager component creation at startup
[hotfix] [taskmanager] Fixes TaskManager component creation at startup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c07278a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c07278a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c07278a
Branch: refs/heads/flip-6
Commit: 9c07278a547c7231d9761a8841fa03dfb554c76c
Parents: 3be561f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 8 18:43:15 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 189 ++++++++++++++++---
.../taskexecutor/TaskExecutorConfiguration.java | 9 -
2 files changed, 159 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9c07278a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 735730b..a455fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,9 +19,19 @@
package org.apache.flink.runtime.taskexecutor;
import akka.actor.ActorSystem;
-import akka.dispatch.ExecutionContexts$;
import akka.util.Timeout;
import com.typesafe.config.Config;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +47,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -61,7 +70,6 @@ import org.apache.flink.util.NetUtils;
import scala.Tuple2;
import scala.Option;
import scala.Some;
-import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@@ -70,9 +78,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.UUID;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -86,6 +94,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/** The unique resource ID of this TaskExecutor */
private final ResourceID resourceID;
+ private final TaskManagerLocation taskManagerLocation;
+
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
@@ -113,22 +123,26 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public TaskExecutor(
TaskExecutorConfiguration taskExecutorConfig,
ResourceID resourceID,
+ TaskManagerLocation taskManagerLocation,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
- int numberOfSlots,
RpcService rpcService,
HighAvailabilityServices haServices) {
super(rpcService);
+ checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
+
this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+ this.taskManagerLocation = checkNotNull(taskManagerLocation);
this.memoryManager = checkNotNull(memoryManager);
this.ioManager = checkNotNull(ioManager);
this.networkEnvironment = checkNotNull(networkEnvironment);
- this.numberOfSlots = checkNotNull(numberOfSlots);
this.haServices = checkNotNull(haServices);
+
+ this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
}
// ------------------------------------------------------------------------
@@ -360,10 +374,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
* then a HighAvailabilityServices is constructed from the configuration.
* @param localTaskManagerCommunication If true, the TaskManager will not initiate the TCP network stack.
* @return An ActorRef to the TaskManager actor.
- * @throws org.apache.flink.configuration.IllegalConfigurationException Thrown, if the given config contains illegal values.
- * @throws java.io.IOException Thrown, if any of the I/O components (such as buffer pools,
+ * @throws IllegalConfigurationException Thrown, if the given config contains illegal values.
+ * @throws IOException Thrown, if any of the I/O components (such as buffer pools,
* I/O manager, ...) cannot be properly started.
- * @throws java.lang.Exception Thrown is some other error occurs while parsing the configuration
+ * @throws Exception Thrown is some other error occurs while parsing the configuration
* or starting the TaskManager components.
*/
public static TaskExecutor startTaskManagerComponentsAndActor(
@@ -377,19 +391,105 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
final TaskExecutorConfiguration taskExecutorConfig = parseTaskManagerConfiguration(
configuration, taskManagerHostname, localTaskManagerCommunication);
+ TaskManagerComponents taskManagerComponents = createTaskManagerComponents(
+ resourceID,
+ InetAddress.getByName(taskManagerHostname),
+ taskExecutorConfig,
+ configuration);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ taskExecutorConfig,
+ resourceID,
+ taskManagerComponents.getTaskManagerLocation(),
+ taskManagerComponents.getMemoryManager(),
+ taskManagerComponents.getIOManager(),
+ taskManagerComponents.getNetworkEnvironment(),
+ rpcService,
+ haServices);
+
+ return taskExecutor;
+ }
+
+ /**
+ * Creates and returns the task manager components.
+ *
+ * @param resourceID resource ID of the task manager
+ * @param taskManagerAddress address of the task manager
+ * @param taskExecutorConfig task manager configuration
+ * @param configuration of Flink
+ * @return task manager components
+ * @throws Exception
+ */
+ private static TaskExecutor.TaskManagerComponents createTaskManagerComponents(
+ ResourceID resourceID,
+ InetAddress taskManagerAddress,
+ TaskExecutorConfiguration taskExecutorConfig,
+ Configuration configuration) throws Exception {
MemoryType memType = taskExecutorConfig.getNetworkConfig().memoryType();
// pre-start checks
checkTempDirs(taskExecutorConfig.getTmpDirPaths());
- ExecutionContext executionContext = ExecutionContexts$.MODULE$.fromExecutor(new ForkJoinPool());
+ NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskExecutorConfig.getNetworkConfig();
+
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ networkEnvironmentConfiguration.numNetworkBuffers(),
+ networkEnvironmentConfiguration.networkBufferSize(),
+ networkEnvironmentConfiguration.memoryType());
+
+ ConnectionManager connectionManager;
+
+ if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+ connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig().get());
+ } else {
+ connectionManager = new LocalConnectionManager();
+ }
+
+ ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+ TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+ KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+ KvStateServer kvStateServer;
+
+ if (networkEnvironmentConfiguration.nettyConfig().isDefined()) {
+ NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig().get();
+
+ int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+ int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+ kvStateServer = new KvStateServer(
+ taskManagerAddress,
+ networkEnvironmentConfiguration.queryServerPort(),
+ numNetworkThreads,
+ numQueryThreads,
+ kvStateRegistry,
+ new DisabledKvStateRequestStats());
+ } else {
+ kvStateServer = null;
+ }
// we start the network first, to make sure it can allocate its buffers first
final NetworkEnvironment network = new NetworkEnvironment(
- executionContext,
- taskExecutorConfig.getTimeout(),
- taskExecutorConfig.getNetworkConfig(),
- taskExecutorConfig.getConnectionInfo());
+ networkBufferPool,
+ connectionManager,
+ resultPartitionManager,
+ taskEventDispatcher,
+ kvStateRegistry,
+ kvStateServer,
+ networkEnvironmentConfiguration.ioMode(),
+ networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+ networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+ network.start();
+
+ TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+ resourceID,
+ taskManagerAddress,
+ network.getConnectionManager().getDataPort());
// computing the amount of memory to use depends on how much memory is available
// it strictly needs to happen AFTER the network stack has been initialized
@@ -473,17 +573,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskExecutorConfig.getTmpDirPaths());
- final TaskExecutor taskExecutor = new TaskExecutor(
- taskExecutorConfig,
- resourceID,
- memoryManager,
- ioManager,
- network,
- taskExecutorConfig.getNumberOfSlots(),
- rpcService,
- haServices);
-
- return taskExecutor;
+ return new TaskExecutor.TaskManagerComponents(taskManagerLocation, memoryManager, ioManager, network);
}
// --------------------------------------------------------------------------
@@ -519,7 +609,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
- final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(taskManagerAddress, dataport);
+ final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
// ----> memory / network stack (shuffles/broadcasts), task slots, temp directories
@@ -576,7 +666,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
- nettyConfig = new NettyConfig(connectionInfo.address(), connectionInfo.dataPort(), pageSize, slots, configuration);
+ nettyConfig = new NettyConfig(
+ taskManagerInetSocketAddress.getAddress(),
+ taskManagerInetSocketAddress.getPort(),
+ pageSize,
+ slots,
+ configuration);
} else {
nettyConfig = null;
}
@@ -613,8 +708,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
queryServerPort,
queryServerNetworkThreads,
queryServerQueryThreads,
- localTaskManagerCommunication ? Option.<NettyConfig>empty() : new Some<>(nettyConfig),
- new Tuple2<>(500, 3000));
+ Option.apply(nettyConfig),
+ 500,
+ 30000);
// ----> timeouts, library caching, profiling
@@ -695,7 +791,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return new TaskExecutorConfiguration(
tmpDirs,
cleanupInterval,
- connectionInfo,
networkConfig,
timeout,
finiteRegistrationDuration,
@@ -829,4 +924,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
onFatalErrorAsync(exception);
}
}
+
+ private static class TaskManagerComponents {
+ private final TaskManagerLocation taskManagerLocation;
+ private final MemoryManager memoryManager;
+ private final IOManager ioManager;
+ private final NetworkEnvironment networkEnvironment;
+
+ private TaskManagerComponents(
+ TaskManagerLocation taskManagerLocation,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment) {
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+ this.memoryManager = Preconditions.checkNotNull(memoryManager);
+ this.ioManager = Preconditions.checkNotNull(ioManager);
+ this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+ }
+
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ public IOManager getIOManager() {
+ return ioManager;
+ }
+
+ public NetworkEnvironment getNetworkEnvironment() {
+ return networkEnvironment;
+ }
+
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c07278a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
index 3707a47..c97c893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import scala.concurrent.duration.FiniteDuration;
@@ -52,12 +51,9 @@ public class TaskExecutorConfiguration implements Serializable {
private final NetworkEnvironmentConfiguration networkConfig;
- private final InstanceConnectionInfo connectionInfo;
-
public TaskExecutorConfiguration(
String[] tmpDirPaths,
long cleanupInterval,
- InstanceConnectionInfo connectionInfo,
NetworkEnvironmentConfiguration networkConfig,
FiniteDuration timeout,
FiniteDuration maxRegistrationDuration,
@@ -66,7 +62,6 @@ public class TaskExecutorConfiguration implements Serializable {
this (tmpDirPaths,
cleanupInterval,
- connectionInfo,
networkConfig,
timeout,
maxRegistrationDuration,
@@ -80,7 +75,6 @@ public class TaskExecutorConfiguration implements Serializable {
public TaskExecutorConfiguration(
String[] tmpDirPaths,
long cleanupInterval,
- InstanceConnectionInfo connectionInfo,
NetworkEnvironmentConfiguration networkConfig,
FiniteDuration timeout,
FiniteDuration maxRegistrationDuration,
@@ -92,7 +86,6 @@ public class TaskExecutorConfiguration implements Serializable {
this.tmpDirPaths = checkNotNull(tmpDirPaths);
this.cleanupInterval = checkNotNull(cleanupInterval);
- this.connectionInfo = checkNotNull(connectionInfo);
this.networkConfig = checkNotNull(networkConfig);
this.timeout = checkNotNull(timeout);
this.maxRegistrationDuration = maxRegistrationDuration;
@@ -115,8 +108,6 @@ public class TaskExecutorConfiguration implements Serializable {
return cleanupInterval;
}
- public InstanceConnectionInfo getConnectionInfo() { return connectionInfo; }
-
public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
public FiniteDuration getTimeout() {