You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/05 13:34:15 UTC

[GitHub] flink pull request #4261: [FLINK-7082] Add generic entry point for session a...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4261

    [FLINK-7082] Add generic entry point for session and per-job clusters

    This PR is based on #4260 and #4259.
    
    This PR introduces a generic entry point `ClusterEntrypoint` for Flink clusters. `ClusterEntrypoint` performs common operations like parsing command line options, loading the Flink configuration and setting up common services (e.g. `RPCService`).
    
    The two sub classes `JobClusterEntrypoint` and `SessionClusterEntrypoint` are the specialization for the per-job and session mode. The former entrypoint spawns a `ResourceManager`, tries to retrieve the `JobGraph` and then launches a `JobManagerRunner` with the retrieved `JobGraph`. The latter starts a `ResourceManager` and the `Dispatcher` component in order to receive job submissions.
    
    The PR also includes a specialization of the `SessionClusterEntrypoint` which is the `StandaloneSessionClusterEntrypoint`. This class simply spawns the `StandaloneResourceManager` for a standalone session cluster.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink addClusterEntrypoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4261
    
----
commit cba1cb11a3eaaea3511120fa7864028a2c47e6a5
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-04T21:15:52Z

    [FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component
    
    The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
    spawning JobManager to execute the jobs and recovering the jobs in case of a master
    failure. This commit adds the basic skeleton including the RPC call for job submission.
    
    Add cleanup logic for finished jobs
    
    Pass BlobService to JobManagerRunner

commit 476239fb52ef45926b0a28b97339158b8095bc3c
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-04T15:47:37Z

    [FLINK-7082] Add generic entry point for session and per-job clusters

commit 4c45f574d66cab076b7711696c0edbb485cb3cd4
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-05T11:38:35Z

    [FLINK-7105] Make ActorSystems non daemonic
    
    In order to not having to explicitly wait on the termination of an ActorSystem
    in the main thread, we now create the ActorSystems in non-daemonic mode. That
    way the process won't terminate if there is still an active actor.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4261: [FLINK-7082] Add generic entry point for session and per-...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4261
  
    Looking at the various diagrams of FLIP-6, there's quite a few possible arrangements of the 'building blocks'.   I wonder if composition over inheritance would be preferable as we develop the other modes.   Maybe an approach similar to traits [example](https://opencredo.com/traits-java-8-default-methods/) to be mixed in to subclasses of `ClusterEntrypoint`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4261: [FLINK-7082] Add generic entry point for session a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4261


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4261: [FLINK-7082] Add generic entry point for session a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4261#discussion_r129513928
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.entrypoint;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +
    +import akka.actor.ActorSystem;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Executor;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Base class for the Flink cluster entry points.
    + *
    + * <p>Specialization of this class can be used for the session mode and the per-job mode
    + */
    +public abstract class ClusterEntrypoint implements FatalErrorHandler {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    +
    +	protected static final int SUCCESS_RETURN_CODE = 0;
    +	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    +	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +	/** The lock to guard startup / shutdown / manipulation methods. */
    +	private final Object lock = new Object();
    +
    +	@GuardedBy("lock")
    +	private MetricRegistry metricRegistry = null;
    +
    +	@GuardedBy("lock")
    +	private HighAvailabilityServices haServices = null;
    +
    +	@GuardedBy("lock")
    +	private BlobServer blobServer = null;
    +
    +	@GuardedBy("lock")
    +	private HeartbeatServices heartbeatServices = null;
    +
    +	@GuardedBy("lock")
    +	private RpcService commonRpcService = null;
    +
    +	protected void startCluster(String[] args) {
    +		final ClusterConfiguration clusterConfiguration = parseArguments(args);
    +
    +		final Configuration configuration = loadConfiguration(clusterConfiguration);
    +
    +		try {
    +			SecurityContext securityContext = installSecurityContext(configuration);
    +
    +			securityContext.runSecured(new Callable<Void>() {
    +				@Override
    +				public Void call() throws Exception {
    +					runCluster(configuration);
    +
    +					return null;
    +				}
    +			});
    +		} catch (Throwable t) {
    +			LOG.error("Cluster initialization failed.", t);
    +
    +			try {
    +				shutDown(false);
    +			} catch (Throwable st) {
    +				LOG.error("Could not properly shut down cluster entrypoint.", st);
    +			}
    +
    +			System.exit(STARTUP_FAILURE_RETURN_CODE);
    +		}
    +	}
    +
    +	protected ClusterConfiguration parseArguments(String[] args) {
    +		ParameterTool parameterTool = ParameterTool.fromArgs(args);
    +
    +		final String configDir = parameterTool.get("configDir", "");
    +
    +		return new ClusterConfiguration(configDir);
    +	}
    +
    +	protected Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
    --- End diff --
    
    Yes we should add support for dynamic properties. We can add the parsed dynamic properties after we have loaded the configuration. I would like to do it as a follow up because I would also like to change the usage of static fields in `GlobalConfiguration`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4261: [FLINK-7082] Add generic entry point for session and per-...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4261
  
    Thanks for the review @EronWright. We'll address your comments with a follow up. I'll rebase this PR and if Travis gives green light, then I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4261: [FLINK-7082] Add generic entry point for session a...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4261#discussion_r128109508
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.entrypoint;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +
    +import akka.actor.ActorSystem;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Executor;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Base class for the Flink cluster entry points.
    + *
    + * <p>Specialization of this class can be used for the session mode and the per-job mode
    + */
    +public abstract class ClusterEntrypoint implements FatalErrorHandler {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    +
    +	protected static final int SUCCESS_RETURN_CODE = 0;
    +	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    +	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +	/** The lock to guard startup / shutdown / manipulation methods. */
    +	private final Object lock = new Object();
    +
    +	@GuardedBy("lock")
    +	private MetricRegistry metricRegistry = null;
    +
    +	@GuardedBy("lock")
    +	private HighAvailabilityServices haServices = null;
    +
    +	@GuardedBy("lock")
    +	private BlobServer blobServer = null;
    +
    +	@GuardedBy("lock")
    +	private HeartbeatServices heartbeatServices = null;
    +
    +	@GuardedBy("lock")
    +	private RpcService commonRpcService = null;
    +
    +	protected void startCluster(String[] args) {
    +		final ClusterConfiguration clusterConfiguration = parseArguments(args);
    +
    +		final Configuration configuration = loadConfiguration(clusterConfiguration);
    +
    +		try {
    +			SecurityContext securityContext = installSecurityContext(configuration);
    +
    +			securityContext.runSecured(new Callable<Void>() {
    +				@Override
    +				public Void call() throws Exception {
    +					runCluster(configuration);
    +
    +					return null;
    +				}
    +			});
    +		} catch (Throwable t) {
    +			LOG.error("Cluster initialization failed.", t);
    +
    +			try {
    +				shutDown(false);
    +			} catch (Throwable st) {
    +				LOG.error("Could not properly shut down cluster entrypoint.", st);
    +			}
    +
    +			System.exit(STARTUP_FAILURE_RETURN_CODE);
    +		}
    +	}
    +
    +	protected ClusterConfiguration parseArguments(String[] args) {
    +		ParameterTool parameterTool = ParameterTool.fromArgs(args);
    +
    +		final String configDir = parameterTool.get("configDir", "");
    +
    +		return new ClusterConfiguration(configDir);
    +	}
    +
    +	protected Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
    --- End diff --
    
    Consider supporting dynamic properties, e.g. `jobmanager.sh -Djobmanager.rpc.port=1234` to assist with YARN/Mesos scenarios, where adjusting the `flink-conf.yaml` might not be feasible.
    
    As seen in `MesosTaskManagerRunner`:
    ```
    final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
    GlobalConfiguration.setDynamicProperties(dynamicProperties);
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4261: [FLINK-7082] Add generic entry point for session a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4261#discussion_r129519933
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.entrypoint;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +
    +import akka.actor.ActorSystem;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Executor;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Base class for the Flink cluster entry points.
    + *
    + * <p>Specialization of this class can be used for the session mode and the per-job mode
    + */
    +public abstract class ClusterEntrypoint implements FatalErrorHandler {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    +
    +	protected static final int SUCCESS_RETURN_CODE = 0;
    +	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    +	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +	/** The lock to guard startup / shutdown / manipulation methods. */
    +	private final Object lock = new Object();
    +
    +	@GuardedBy("lock")
    +	private MetricRegistry metricRegistry = null;
    +
    +	@GuardedBy("lock")
    +	private HighAvailabilityServices haServices = null;
    +
    +	@GuardedBy("lock")
    +	private BlobServer blobServer = null;
    +
    +	@GuardedBy("lock")
    +	private HeartbeatServices heartbeatServices = null;
    +
    +	@GuardedBy("lock")
    +	private RpcService commonRpcService = null;
    +
    +	protected void startCluster(String[] args) {
    +		final ClusterConfiguration clusterConfiguration = parseArguments(args);
    +
    +		final Configuration configuration = loadConfiguration(clusterConfiguration);
    +
    +		try {
    +			SecurityContext securityContext = installSecurityContext(configuration);
    +
    +			securityContext.runSecured(new Callable<Void>() {
    +				@Override
    +				public Void call() throws Exception {
    +					runCluster(configuration);
    +
    +					return null;
    +				}
    +			});
    +		} catch (Throwable t) {
    +			LOG.error("Cluster initialization failed.", t);
    +
    +			try {
    +				shutDown(false);
    +			} catch (Throwable st) {
    +				LOG.error("Could not properly shut down cluster entrypoint.", st);
    +			}
    +
    +			System.exit(STARTUP_FAILURE_RETURN_CODE);
    +		}
    +	}
    +
    +	protected ClusterConfiguration parseArguments(String[] args) {
    +		ParameterTool parameterTool = ParameterTool.fromArgs(args);
    +
    +		final String configDir = parameterTool.get("configDir", "");
    +
    +		return new ClusterConfiguration(configDir);
    +	}
    +
    +	protected Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
    --- End diff --
    
    Follow-up JIRA: https://issues.apache.org/jira/browse/FLINK-7270


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4261: [FLINK-7082] Add generic entry point for session and per-...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4261
  
    You're totally right about the current problem with inheritance when having multiple dimensions (job vs. session, yarn vs. standalone. vs Mesos). I've looked at the link you've posted and it looks really interesting. Once we have ditched Java 7 support, we can change the code to use this technique in order to reduce code duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---