You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:21:56 UTC
[16/50] [abbrv] flink git commit: [FLINK-4505] [cluster mngt]
Separate TaskManager service configuration from TaskManagerConfiguration;
Implement TaskManagerRunner
[FLINK-4505] [cluster mngt] Separate TaskManager service configuration from TaskManagerConfiguration; Implement TaskManagerRunner
Refactors the startup logic so that is easier to reuse.
This closes #2461.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d25ea85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d25ea85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d25ea85
Branch: refs/heads/flip-6
Commit: 1d25ea85e8f10f6ab67b4ebe7274236778ee4b0f
Parents: a64e818
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 12:33:15 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:46:25 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServicesUtils.java | 41 +
.../flink/runtime/rpc/RpcServiceUtils.java | 73 ++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 2 +
.../runtime/taskexecutor/TaskExecutor.java | 51 +-
.../taskexecutor/TaskExecutorConfiguration.java | 142 ----
.../taskexecutor/TaskManagerConfiguration.java | 205 +++++
.../runtime/taskexecutor/TaskManagerRunner.java | 172 +++++
.../taskexecutor/TaskManagerServices.java | 320 ++++++++
.../TaskManagerServicesConfiguration.java | 325 ++++++++
.../runtime/taskmanager/TaskManagerRunner.java | 749 -------------------
.../runtime/util/LeaderRetrievalUtils.java | 7 +
.../apache/flink/runtime/akka/AkkaUtils.scala | 4 +
.../NetworkEnvironmentConfiguration.scala | 2 +-
.../flink/runtime/taskmanager/TaskManager.scala | 6 +-
.../io/network/NetworkEnvironmentTest.java | 4 +-
.../runtime/rpc/TestingSerialRpcService.java | 1 -
.../runtime/taskexecutor/TaskExecutorTest.java | 29 +-
...askManagerComponentsStartupShutdownTest.java | 3 +-
.../TaskManagerConfigurationTest.java | 1 -
19 files changed, 1195 insertions(+), 942 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
new file mode 100644
index 0000000..f3da847
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+public class HighAvailabilityServicesUtils {
+
+ public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
+ HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
+
+ switch(highAvailabilityMode) {
+ case NONE:
+ final String resourceManagerAddress = null;
+ return new NonHaServices(resourceManagerAddress);
+ case ZOOKEEPER:
+ throw new UnsupportedOperationException("ZooKeeper high availability services " +
+ "have not been implemented yet.");
+ default:
+ throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
new file mode 100644
index 0000000..d40e336
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.NetUtils;
+import org.jboss.netty.channel.ChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RpcServiceUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
+
+ /**
+ * Utility method to create RPC service from configuration and hostname, port.
+ *
+ * @param hostname The hostname/address that describes the TaskManager's data location.
+ * @param port If true, the TaskManager will not initiate the TCP network stack.
+ * @param configuration The configuration for the TaskManager.
+ * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+ * @throws IOException Thrown, if the actor system can not bind to the address
+ * @throws Exception Thrown is some other error occurs while creating akka actor system
+ */
+ public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
+ LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
+
+ final ActorSystem actorSystem;
+
+ try {
+ Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+
+ LOG.debug("Using akka configuration \n {}.", akkaConfig);
+
+ actorSystem = AkkaUtils.createActorSystem(akkaConfig);
+ } catch (Throwable t) {
+ if (t instanceof ChannelException) {
+ Throwable cause = t.getCause();
+ if (cause != null && t.getCause() instanceof java.net.BindException) {
+ String address = NetUtils.hostAndPortToUrlString(hostname, port);
+ throw new IOException("Unable to bind AkkaRpcService actor system to address " +
+ address + " - " + cause.getMessage(), t);
+ }
+ }
+ throw new Exception("Could not create TaskManager actor system", t);
+ }
+
+ final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+ return new AkkaRpcService(actorSystem, timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6825557..fb7896a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -93,6 +93,8 @@ public class AkkaRpcService implements RpcService {
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+
+
if (actorSystemAddress.host().isDefined()) {
address = actorSystemAddress.host().get();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8ce2780..7df0a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,16 +18,14 @@
package org.apache.flink.runtime.taskexecutor;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.jboss.netty.channel.ChannelException;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.BindException;
+
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -60,7 +58,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private final HighAvailabilityServices haServices;
/** The task manager configuration */
- private final TaskExecutorConfiguration taskExecutorConfig;
+ private final TaskManagerConfiguration taskManagerConfiguration;
/** The I/O manager component in the task manager */
private final IOManager ioManager;
@@ -71,9 +69,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/** The network component in the task manager */
private final NetworkEnvironment networkEnvironment;
+ /** The metric registry in the task manager */
+ private final MetricRegistry metricRegistry;
+
/** The number of slots in the task manager, should be 1 for YARN */
private final int numberOfSlots;
+ /** The fatal error handler to use in case of a fatal error */
+ private final FatalErrorHandler fatalErrorHandler;
+
// --------- resource manager --------
private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -81,26 +85,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ------------------------------------------------------------------------
public TaskExecutor(
- TaskExecutorConfiguration taskExecutorConfig,
+ TaskManagerConfiguration taskManagerConfiguration,
TaskManagerLocation taskManagerLocation,
RpcService rpcService,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
- HighAvailabilityServices haServices) {
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) {
super(rpcService);
- checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
+ checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
- this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
+ this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
this.taskManagerLocation = checkNotNull(taskManagerLocation);
this.memoryManager = checkNotNull(memoryManager);
this.ioManager = checkNotNull(ioManager);
this.networkEnvironment = checkNotNull(networkEnvironment);
this.haServices = checkNotNull(haServices);
+ this.metricRegistry = checkNotNull(metricRegistry);
+ this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
- this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
+ this.numberOfSlots = taskManagerConfiguration.getNumberSlots();
}
// ------------------------------------------------------------------------
@@ -158,6 +166,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
+ /**
* Requests a slot from the TaskManager
*
* @param allocationID id for the request
@@ -169,22 +178,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return new SlotRequestRegistered(allocationID);
}
- /**
- public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
- return null;
- }
-
- @Override
- return null;
- }
-
- @Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
- return null;
- }
-
- @Override
- public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@@ -222,7 +215,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
void onFatalError(Throwable t) {
// to be determined, probably delegate to a fatal error handler that
// would either log (mini cluster) ot kill the process (yarn, mesos, ...)
- log.error("FATAL ERROR", t);
+ fatalErrorHandler.onFatalError(t);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
deleted file mode 100644
index c97c893..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link TaskExecutor} Configuration
- */
-public class TaskExecutorConfiguration implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final String[] tmpDirPaths;
-
- private final long cleanupInterval;
-
- private final int numberOfSlots;
-
- private final Configuration configuration;
-
- private final FiniteDuration timeout;
- private final FiniteDuration maxRegistrationDuration;
- private final FiniteDuration initialRegistrationPause;
- private final FiniteDuration maxRegistrationPause;
- private final FiniteDuration refusedRegistrationPause;
-
- private final NetworkEnvironmentConfiguration networkConfig;
-
- public TaskExecutorConfiguration(
- String[] tmpDirPaths,
- long cleanupInterval,
- NetworkEnvironmentConfiguration networkConfig,
- FiniteDuration timeout,
- FiniteDuration maxRegistrationDuration,
- int numberOfSlots,
- Configuration configuration) {
-
- this (tmpDirPaths,
- cleanupInterval,
- networkConfig,
- timeout,
- maxRegistrationDuration,
- numberOfSlots,
- configuration,
- new FiniteDuration(500, TimeUnit.MILLISECONDS),
- new FiniteDuration(30, TimeUnit.SECONDS),
- new FiniteDuration(10, TimeUnit.SECONDS));
- }
-
- public TaskExecutorConfiguration(
- String[] tmpDirPaths,
- long cleanupInterval,
- NetworkEnvironmentConfiguration networkConfig,
- FiniteDuration timeout,
- FiniteDuration maxRegistrationDuration,
- int numberOfSlots,
- Configuration configuration,
- FiniteDuration initialRegistrationPause,
- FiniteDuration maxRegistrationPause,
- FiniteDuration refusedRegistrationPause) {
-
- this.tmpDirPaths = checkNotNull(tmpDirPaths);
- this.cleanupInterval = checkNotNull(cleanupInterval);
- this.networkConfig = checkNotNull(networkConfig);
- this.timeout = checkNotNull(timeout);
- this.maxRegistrationDuration = maxRegistrationDuration;
- this.numberOfSlots = checkNotNull(numberOfSlots);
- this.configuration = checkNotNull(configuration);
- this.initialRegistrationPause = checkNotNull(initialRegistrationPause);
- this.maxRegistrationPause = checkNotNull(maxRegistrationPause);
- this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause);
- }
-
- // --------------------------------------------------------------------------------------------
- // Properties
- // --------------------------------------------------------------------------------------------
-
- public String[] getTmpDirPaths() {
- return tmpDirPaths;
- }
-
- public long getCleanupInterval() {
- return cleanupInterval;
- }
-
- public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
-
- public FiniteDuration getTimeout() {
- return timeout;
- }
-
- public FiniteDuration getMaxRegistrationDuration() {
- return maxRegistrationDuration;
- }
-
- public int getNumberOfSlots() {
- return numberOfSlots;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public FiniteDuration getInitialRegistrationPause() {
- return initialRegistrationPause;
- }
-
- public FiniteDuration getMaxRegistrationPause() {
- return maxRegistrationPause;
- }
-
- public FiniteDuration getRefusedRegistrationPause() {
- return refusedRegistrationPause;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
new file mode 100644
index 0000000..32eb8c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.io.File;
+
+/**
+ * Configuration object for {@link TaskExecutor}.
+ */
+public class TaskManagerConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
+
+ private final int numberSlots;
+
+ private final String[] tmpDirPaths;
+
+ private final Time timeout;
+ private final Time maxRegistrationDuration;
+ private final Time initialRegistrationPause;
+ private final Time maxRegistrationPause;
+ private final Time refusedRegistrationPause;
+
+ private final long cleanupInterval;
+
+ public TaskManagerConfiguration(
+ int numberSlots,
+ String[] tmpDirPaths,
+ Time timeout,
+ Time maxRegistrationDuration,
+ Time initialRegistrationPause,
+ Time maxRegistrationPause,
+ Time refusedRegistrationPause,
+ long cleanupInterval) {
+
+ this.numberSlots = numberSlots;
+ this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+ this.timeout = Preconditions.checkNotNull(timeout);
+ this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration);
+ this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
+ this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
+ this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
+ this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
+ }
+
+ public int getNumberSlots() {
+ return numberSlots;
+ }
+
+ public String[] getTmpDirPaths() {
+ return tmpDirPaths;
+ }
+
+ public Time getTimeout() {
+ return timeout;
+ }
+
+ public Time getMaxRegistrationDuration() {
+ return maxRegistrationDuration;
+ }
+
+ public Time getInitialRegistrationPause() {
+ return initialRegistrationPause;
+ }
+
+ public Time getMaxRegistrationPause() {
+ return maxRegistrationPause;
+ }
+
+ public Time getRefusedRegistrationPause() {
+ return refusedRegistrationPause;
+ }
+
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static factory methods
+ // --------------------------------------------------------------------------------------------
+
+ public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
+ int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+ if (numberSlots == -1) {
+ numberSlots = 1;
+ }
+
+ final String[] tmpDirPaths = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final Time timeout;
+
+ try {
+ timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+ "'.Use formats like '50 s' or '1 min' to specify the timeout.");
+ }
+
+ LOG.info("Messages have a max timeout of " + timeout);
+
+ final long cleanupInterval = configuration.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+ ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+ final Time finiteRegistrationDuration;
+
+ try {
+ Duration maxRegistrationDuration = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+ if (maxRegistrationDuration.isFinite()) {
+ finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds());
+ } else {
+ finiteRegistrationDuration = null;
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+ }
+
+ final Time initialRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ initialRegistrationPause = Time.seconds(pause.toSeconds());
+ } else {
+ throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final Time maxRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ maxRegistrationPause = Time.seconds(pause.toSeconds());
+ } else {
+ throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ final Time refusedRegistrationPause;
+ try {
+ Duration pause = Duration.create(configuration.getString(
+ ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+ if (pause.isFinite()) {
+ refusedRegistrationPause = Time.seconds(pause.toSeconds());
+ } else {
+ throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
+ }
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid format for parameter " +
+ ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+ }
+
+ return new TaskManagerConfiguration(
+ numberSlots,
+ tmpDirPaths,
+ timeout,
+ finiteRegistrationDuration,
+ initialRegistrationPause,
+ maxRegistrationPause,
+ refusedRegistrationPause,
+ cleanupInterval);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
new file mode 100644
index 0000000..8ac0ddd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or standalone mode.
+ * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner implements FatalErrorHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+
+ private final Object lock = new Object();
+
+ private final Configuration configuration;
+
+ private final ResourceID resourceID;
+
+ private final RpcService rpcService;
+
+ private final HighAvailabilityServices highAvailabilityServices;
+
+ /** Executor used to run future callbacks */
+ private final Executor executor;
+
+ private final TaskExecutor taskManager;
+
+ public TaskManagerRunner(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ Executor executor) throws Exception {
+
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.resourceID = Preconditions.checkNotNull(resourceID);
+ this.rpcService = Preconditions.checkNotNull(rpcService);
+ this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+ this.executor = rpcService.getExecutor();
+
+ InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
+
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(
+ configuration,
+ remoteAddress,
+ false);
+
+ TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID);
+
+ TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+
+ this.taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerServices.getTaskManagerLocation(),
+ rpcService,
+ taskManagerServices.getMemoryManager(),
+ taskManagerServices.getIOManager(),
+ taskManagerServices.getNetworkEnvironment(),
+ highAvailabilityServices,
+ taskManagerServices.getMetricRegistry(),
+ this);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Lifecycle management
+ // --------------------------------------------------------------------------------------------
+
+ public void start() {
+ taskManager.start();
+ }
+
+ public void shutDown(Throwable cause) {
+ shutDownInternally();
+ }
+
+ protected void shutDownInternally() {
+ synchronized(lock) {
+ taskManager.shutDown();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // FatalErrorHandler methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
+ shutDown(exception);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Create a RPC service for the task manager.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param haServices to use for the task manager hostname retrieval
+ */
+ public static RpcService createRpcService(
+ final Configuration configuration,
+ final HighAvailabilityServices haServices) throws Exception {
+
+ checkNotNull(configuration);
+ checkNotNull(haServices);
+
+ String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
+ } else {
+ Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
+
+ InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
+ haServices.getResourceManagerLeaderRetriever(),
+ lookupTimeout);
+
+ taskManagerHostname = taskManagerAddress.getHostName();
+
+ LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
+ taskManagerHostname, taskManagerAddress.getHostAddress());
+ }
+
+ final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
+
+ Preconditions.checkState(rpcPort < 0 || rpcPort >65535, "Invalid value for " +
+ "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
+ "use 0 to let the system choose port automatically.",
+ ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
+
+ return RpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
new file mode 100644
index 0000000..ff7f7d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
+ * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ */
+public class TaskManagerServices {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
+
+ /** TaskManager services */
+ private final TaskManagerLocation taskManagerLocation;
+ private final MemoryManager memoryManager;
+ private final IOManager ioManager;
+ private final NetworkEnvironment networkEnvironment;
+ private final MetricRegistry metricRegistry;
+
+ private TaskManagerServices(
+ TaskManagerLocation taskManagerLocation,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ NetworkEnvironment networkEnvironment,
+ MetricRegistry metricRegistry) {
+
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+ this.memoryManager = Preconditions.checkNotNull(memoryManager);
+ this.ioManager = Preconditions.checkNotNull(ioManager);
+ this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+ this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Getter/Setter
+ // --------------------------------------------------------------------------------------------
+
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+
+ public IOManager getIOManager() {
+ return ioManager;
+ }
+
+ public NetworkEnvironment getNetworkEnvironment() {
+ return networkEnvironment;
+ }
+
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static factory methods for task manager services
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates and returns the task manager services.
+ *
+ * @param resourceID resource ID of the task manager
+ * @param taskManagerServicesConfiguration task manager configuration
+ * @return task manager components
+ * @throws Exception
+ */
+ public static TaskManagerServices fromConfiguration(
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration,
+ ResourceID resourceID) throws Exception {
+
+ final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration);
+
+ network.start();
+
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
+ resourceID,
+ taskManagerServicesConfiguration.getTaskManagerAddress(),
+ network.getConnectionManager().getDataPort());
+
+ // this call has to happen strictly after the network stack has been initialized
+ final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
+
+ // start the I/O manager, it will create some temp directories.
+ final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
+
+ MetricRegistry metricsRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+
+ return new TaskManagerServices(taskManagerLocation, memoryManager, ioManager, network, metricsRegistry);
+ }
+
+ /**
+ * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
+ *
+ * @param taskManagerServicesConfiguration to create the memory manager from
+ * @return Memory manager
+ * @throws Exception
+ */
+ private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {
+ // computing the amount of memory to use depends on how much memory is available
+ // it strictly needs to happen AFTER the network stack has been initialized
+
+ MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType();
+
+ // check if a value has been configured
+ long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
+
+ final long memorySize;
+
+ boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
+
+ if (configuredMemory > 0) {
+ if (preAllocateMemory) {
+ LOG.info("Using {} MB for managed memory." , configuredMemory);
+ } else {
+ LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
+ }
+ memorySize = configuredMemory << 20; // megabytes to bytes
+ } else {
+ float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
+
+ if (memType == MemoryType.HEAP) {
+ long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
+ memoryFraction , relativeMemSize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
+ "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
+ }
+ memorySize = relativeMemSize;
+ } else if (memType == MemoryType.OFF_HEAP) {
+ // The maximum heap memory has been adjusted according to the fraction
+ long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
+ long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
+ if (preAllocateMemory) {
+ LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
+ memoryFraction, directMemorySize >> 20);
+ } else {
+ LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
+ " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
+ }
+ memorySize = directMemorySize;
+ } else {
+ throw new RuntimeException("No supported memory type detected.");
+ }
+ }
+
+ // now start the memory manager
+ final MemoryManager memoryManager;
+ try {
+ memoryManager = new MemoryManager(
+ memorySize,
+ taskManagerServicesConfiguration.getNumberOfSlots(),
+ taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
+ memType,
+ preAllocateMemory);
+ } catch (OutOfMemoryError e) {
+ if (memType == MemoryType.HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
+ } else if (memType == MemoryType.OFF_HEAP) {
+ throw new Exception("OutOfMemory error (" + e.getMessage() +
+ ") while allocating the TaskManager off-heap memory (" + memorySize +
+ " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
+ } else {
+ throw e;
+ }
+ }
+ return memoryManager;
+ }
+
+ /**
+ * Creates the {@link NetworkEnvironment} from the given {@link TaskManagerServicesConfiguration}.
+ *
+ * @param taskManagerServicesConfiguration to construct the network environment from
+ * @return Network environment
+ * @throws IOException
+ */
+ private static NetworkEnvironment createNetworkEnvironment(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws IOException {
+ // pre-start checks
+ checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
+
+ NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
+
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ networkEnvironmentConfiguration.numNetworkBuffers(),
+ networkEnvironmentConfiguration.networkBufferSize(),
+ networkEnvironmentConfiguration.memoryType());
+
+ ConnectionManager connectionManager;
+
+ if (networkEnvironmentConfiguration.nettyConfig() != null) {
+ connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig());
+ } else {
+ connectionManager = new LocalConnectionManager();
+ }
+
+ ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+ TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
+
+ KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+ KvStateServer kvStateServer;
+
+ if (networkEnvironmentConfiguration.nettyConfig() != null) {
+ NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
+
+ int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads();
+
+ int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ?
+ nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads();
+
+ kvStateServer = new KvStateServer(
+ taskManagerServicesConfiguration.getTaskManagerAddress(),
+ networkEnvironmentConfiguration.queryServerPort(),
+ numNetworkThreads,
+ numQueryThreads,
+ kvStateRegistry,
+ new DisabledKvStateRequestStats());
+ } else {
+ kvStateServer = null;
+ }
+
+ // we start the network first, to make sure it can allocate its buffers first
+ final NetworkEnvironment network = new NetworkEnvironment(
+ networkBufferPool,
+ connectionManager,
+ resultPartitionManager,
+ taskEventDispatcher,
+ kvStateRegistry,
+ kvStateServer,
+ networkEnvironmentConfiguration.ioMode(),
+ networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
+ networkEnvironmentConfiguration.partitinRequestMaxBackoff());
+
+ return network;
+ }
+
+ /**
+ * Validates that all the directories denoted by the strings do actually exist, are proper
+ * directories (not files), and are writable.
+ *
+ * @param tmpDirs The array of directory paths to check.
+ * @throws Exception Thrown if any of the directories does not exist or is not writable
+ * or is a file, rather than a directory.
+ */
+ private static void checkTempDirs(String[] tmpDirs) throws IOException {
+ for (String dir : tmpDirs) {
+ if (dir != null && !dir.equals("")) {
+ File file = new File(dir);
+ if (!file.exists()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist.");
+ }
+ if (!file.isDirectory()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
+ }
+ if (!file.canWrite()) {
+ throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ long totalSpaceGb = file.getTotalSpace() >> 30;
+ long usableSpaceGb = file.getUsableSpace() >> 30;
+ double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100;
+ String path = file.getAbsolutePath();
+ LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
+ path, totalSpaceGb, usableSpaceGb, usablePercentage));
+ }
+ } else {
+ throw new IllegalArgumentException("Temporary file directory #$id is null.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
new file mode 100644
index 0000000..66d969a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.util.MathUtils;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration for the task manager services such as the network environment, the memory manager,
+ * the io manager and the metric registry
+ */
+public class TaskManagerServicesConfiguration {
+
+ private final InetAddress taskManagerAddress;
+
+ private final String[] tmpDirPaths;
+
+ private final int numberOfSlots;
+
+ private final NetworkEnvironmentConfiguration networkConfig;
+
+ private final long configuredMemory;
+
+ private final boolean preAllocateMemory;
+
+ private final float memoryFraction;
+
+ private final MetricRegistryConfiguration metricRegistryConfiguration;
+
+ public TaskManagerServicesConfiguration(
+ InetAddress taskManagerAddress,
+ String[] tmpDirPaths,
+ NetworkEnvironmentConfiguration networkConfig,
+ int numberOfSlots,
+ long configuredMemory,
+ boolean preAllocateMemory,
+ float memoryFraction,
+ MetricRegistryConfiguration metricRegistryConfiguration) {
+
+ this.taskManagerAddress = checkNotNull(taskManagerAddress);
+ this.tmpDirPaths = checkNotNull(tmpDirPaths);
+ this.networkConfig = checkNotNull(networkConfig);
+ this.numberOfSlots = checkNotNull(numberOfSlots);
+
+ this.configuredMemory = configuredMemory;
+ this.preAllocateMemory = preAllocateMemory;
+ this.memoryFraction = memoryFraction;
+
+ this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Getter/Setter
+ // --------------------------------------------------------------------------------------------
+
+
+ public InetAddress getTaskManagerAddress() {
+ return taskManagerAddress;
+ }
+
+ public String[] getTmpDirPaths() {
+ return tmpDirPaths;
+ }
+
+ public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; }
+
+ public int getNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ public float getMemoryFraction() {
+ return memoryFraction;
+ }
+
+ public long getConfiguredMemory() {
+ return configuredMemory;
+ }
+
+ public boolean isPreAllocateMemory() {
+ return preAllocateMemory;
+ }
+
+ public MetricRegistryConfiguration getMetricRegistryConfiguration() {
+ return metricRegistryConfiguration;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Parsing of Flink configuration
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Utility method to extract TaskManager config parameters from the configuration and to
+ * sanity check them.
+ *
+ * @param configuration The configuration.
+ * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
+ * @param localCommunication True, to skip initializing the network stack.
+ * Use only in cases where only one task manager runs.
+ * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
+ */
+ public static TaskManagerServicesConfiguration fromConfiguration(
+ Configuration configuration,
+ InetAddress remoteAddress,
+ boolean localCommunication) throws Exception {
+
+ // we need this because many configs have been written with a "-1" entry
+ int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
+
+ final String[] tmpDirs = configuration.getString(
+ ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+ final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
+ configuration,
+ localCommunication,
+ remoteAddress,
+ slots);
+
+ // extract memory settings
+ long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+ checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+ "MemoryManager needs at least one MB of memory. " +
+ "If you leave this config parameter empty, the system automatically " +
+ "pick a fraction of the available memory.");
+
+ boolean preAllocateMemory = configuration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+
+ float memoryFraction = configuration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+ checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
+
+ final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
+
+ return new TaskManagerServicesConfiguration(
+ remoteAddress,
+ tmpDirs,
+ networkConfig,
+ slots,
+ configuredMemory,
+ preAllocateMemory,
+ memoryFraction,
+ metricRegistryConfiguration);
+ }
+
+ // --------------------------------------------------------------------------
+ // Parsing and checking the TaskManager Configuration
+ // --------------------------------------------------------------------------
+
+ /**
+ * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
+ *
+ * @param configuration to create the network environment configuration from
+ * @param localTaskManagerCommunication true if task manager communication is local
+ * @param taskManagerAddress address of the task manager
+ * @param slots to start the task manager with
+ * @return Network environment configuration
+ */
+ private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
+ Configuration configuration,
+ boolean localTaskManagerCommunication,
+ InetAddress taskManagerAddress,
+ int slots) throws Exception {
+
+ // ----> hosts / ports for communication and data exchange
+
+ int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+
+ checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+ "Leave config parameter empty or use 0 to let the system choose a port automatically.");
+
+ checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ "Number of task slots must be at least one.");
+
+ final int numNetworkBuffers = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+ checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+
+ final int pageSize = configuration.getInteger(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+
+ // check page size of for minimum size
+ checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
+
+ // check page size for power of two
+ checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ "Memory segment size must be a power of 2.");
+
+ // check whether we use heap or off-heap memory
+ final MemoryType memType;
+ if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+ memType = MemoryType.OFF_HEAP;
+ } else {
+ memType = MemoryType.HEAP;
+ }
+
+ // initialize the memory segment factory accordingly
+ if (memType == MemoryType.HEAP) {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to heap memory, but memory segment " +
+ "factory has been initialized for off-heap memory segments");
+ }
+ } else {
+ if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) {
+ throw new Exception("Memory type is set to off-heap memory, but memory segment " +
+ "factory has been initialized for heap memory segments");
+ }
+ }
+
+ final NettyConfig nettyConfig;
+ if (!localTaskManagerCommunication) {
+ final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
+
+ nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
+ taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
+ } else {
+ nettyConfig = null;
+ }
+
+ // Default spill I/O mode for intermediate results
+ final String syncOrAsync = configuration.getString(
+ ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
+
+ final IOManager.IOMode ioMode;
+ if (syncOrAsync.equals("async")) {
+ ioMode = IOManager.IOMode.ASYNC;
+ } else {
+ ioMode = IOManager.IOMode.SYNC;
+ }
+
+ final int queryServerPort = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_PORT,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT);
+
+ final int queryServerNetworkThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS);
+
+ final int queryServerQueryThreads = configuration.getInteger(
+ ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS,
+ ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS);
+
+ return new NetworkEnvironmentConfiguration(
+ numNetworkBuffers,
+ pageSize,
+ memType,
+ ioMode,
+ queryServerPort,
+ queryServerNetworkThreads,
+ queryServerQueryThreads,
+ nettyConfig,
+ 500,
+ 3000);
+ }
+
+ /**
+ * Validates a condition for a config parameter and displays a standard exception, if the
+ * the condition does not hold.
+ *
+ * @param condition The condition that must hold. If the condition is false, an exception is thrown.
+ * @param parameter The parameter value. Will be shown in the exception message.
+ * @param name The name of the config parameter. Will be shown in the exception message.
+ * @param errorMessage The optional custom error message to append to the exception message.
+ */
+ private static void checkConfigParameter(
+ boolean condition,
+ Object parameter,
+ String name,
+ String errorMessage) {
+ if (!condition) {
+ throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage);
+ }
+ }
+}
+