You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/07 13:59:49 UTC

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4281

    [FLINK-7108] [yarn] Add YARN entry points based on the generic entry point

    This PR is based on #4259, #4260, #4261 and #4272.
    
    Add the YarnSesssionClusterEntrypoint and the YarnJobClusterEntrypoint which extend
    SessionClusterEntrypoint and JobClusterEntrypoint, respectively.
    
    Add new Yarn session and per-job cluster entry points
    
    Remove old Flip-6 Yarn per job entry point

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink yarnEntrypoints

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4281
    
----
commit 6c313e0ecc8ff84aee9403acbc60efa31d49f69e
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-04T21:15:52Z

    [FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component
    
    The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
    spawning JobManager to execute the jobs and recovering the jobs in case of a master
    failure. This commit adds the basic skeleton including the RPC call for job submission.
    
    Add cleanup logic for finished jobs
    
    Pass BlobService to JobManagerRunner

commit 44abd04f328a60d3d25f74454ad04bd5c79fc258
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-05T11:38:35Z

    [FLINK-7105] Make ActorSystems non daemonic
    
    In order to not having to explicitly wait on the termination of an ActorSystem
    in the main thread, we now create the ActorSystems in non-daemonic mode. That
    way the process won't terminate if there is still an active actor.

commit 4a2b08645a2e7d80c7404b6af6f396178de19e33
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-06T15:16:18Z

    [FLINK-7082] Add generic entry point for session and per-job clusters

commit 3768ca597e40230da4edc75fa6b6f3203e4ee313
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-06T15:16:54Z

    [FLINK-7086] Add Flip-6 standalone session cluster entry point

commit cb52bb4ea730529125556de6dc1201b90472eee7
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-05T15:16:52Z

    [FLINK-7108] [yarn] Add YARN entry points based on the generic entry point
    
    Add the YarnSesssionClusterEntrypoint and the YarnJobClusterEntrypoint which extend
    SessionClusterEntrypoint and JobClusterEntrypoint, respectively.
    
    Add new Yarn session and per-job cluster entry points
    
    Remove old Flip-6 Yarn per job entry point

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r128129161
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.SecurityOptions;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.Utils;
    +import org.apache.flink.yarn.YarnConfigKeys;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.slf4j.Logger;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
    + * {@link YarnJobClusterEntrypoint}.
    + */
    +public class YarnEntrypointUtils {
    +
    +	public static SecurityContext installSecurityContext(
    +			Configuration configuration,
    +			String workingDirectory) throws Exception {
    +		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
    +
    +		//To support Yarn Secure Integration Test Scenario
    +		File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
    +		if (krb5Conf.exists() && krb5Conf.canRead()) {
    +			hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
    +			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
    +			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
    +		}
    +
    +		SecurityUtils.SecurityConfiguration sc;
    +		if (hadoopConfiguration != null) {
    +			sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
    +		} else {
    +			sc = new SecurityUtils.SecurityConfiguration(configuration);
    +		}
    +
    +		SecurityUtils.install(sc);
    +
    +		return SecurityUtils.getInstalledContext();
    +	}
    +
    +	public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) {
    --- End diff --
    
    Much of this could be simplified by moving to standard dynamic properties.   The code seems to convert from `YarnConfigKeys` settings to native Flink config settings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r130036810
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.SecurityOptions;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.Utils;
    +import org.apache.flink.yarn.YarnConfigKeys;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.slf4j.Logger;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
    + * {@link YarnJobClusterEntrypoint}.
    + */
    +public class YarnEntrypointUtils {
    +
    +	public static SecurityContext installSecurityContext(
    +			Configuration configuration,
    +			String workingDirectory) throws Exception {
    +		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
    +
    +		//To support Yarn Secure Integration Test Scenario
    --- End diff --
    
    I agree that including test code in the production codepath is not nice. Since this PR is mainly restructuring the existing YARN code, I would like to address this issue with a separate PR: https://issues.apache.org/jira/browse/FLINK-7288


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4281


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r128130195
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.YarnResourceManager;
    +
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.util.Map;
    +
    +public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
    +
    +	/** The job graph file path. */
    +	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
    +
    +	private final String workingDirectory;
    +
    +	public YarnJobClusterEntrypoint(
    +			Configuration configuration,
    +			String workingDirectory) {
    +
    +		super(configuration);
    +		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
    +	}
    +
    +	@Override
    +	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
    +		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
    +	}
    +
    +	@Override
    +	protected ResourceManager<?> createResourceManager(
    +			Configuration configuration,
    +			ResourceID resourceId,
    +			RpcService rpcService,
    +			HighAvailabilityServices highAvailabilityServices,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
    +		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
    +			rmServicesConfiguration,
    +			highAvailabilityServices,
    +			rpcService.getScheduledExecutor());
    +
    +		return new YarnResourceManager(
    +			rpcService,
    +			ResourceManager.RESOURCE_MANAGER_NAME,
    +			resourceId,
    +			configuration,
    +			System.getenv(),
    +			rmConfiguration,
    +			highAvailabilityServices,
    +			heartbeatServices,
    +			rmRuntimeServices.getSlotManager(),
    +			metricRegistry,
    +			rmRuntimeServices.getJobLeaderIdService(),
    +			fatalErrorHandler);
    +	}
    +
    +	@Override
    +	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
    +		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
    --- End diff --
    
    Ideally a standard approach to locating the job to launch could be developed across K8/YARN/etc.   It hadn't occurred to me that the docker image would have a serialized job graph within it (as opposed to the packaged program w/ `main` method) , but it definitely simplifies the recovery model.    This has me wondering about how `flink run` would combine with this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r130038556
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.YarnResourceManager;
    +
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.util.Map;
    +
    +public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
    +
    +	/** The job graph file path. */
    +	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
    +
    +	private final String workingDirectory;
    +
    +	public YarnJobClusterEntrypoint(
    +			Configuration configuration,
    +			String workingDirectory) {
    +
    +		super(configuration);
    +		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
    +	}
    +
    +	@Override
    +	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
    +		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
    +	}
    +
    +	@Override
    +	protected ResourceManager<?> createResourceManager(
    +			Configuration configuration,
    +			ResourceID resourceId,
    +			RpcService rpcService,
    +			HighAvailabilityServices highAvailabilityServices,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
    +		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
    +			rmServicesConfiguration,
    +			highAvailabilityServices,
    +			rpcService.getScheduledExecutor());
    +
    +		return new YarnResourceManager(
    +			rpcService,
    +			ResourceManager.RESOURCE_MANAGER_NAME,
    +			resourceId,
    +			configuration,
    +			System.getenv(),
    +			rmConfiguration,
    +			highAvailabilityServices,
    +			heartbeatServices,
    +			rmRuntimeServices.getSlotManager(),
    +			metricRegistry,
    +			rmRuntimeServices.getJobLeaderIdService(),
    +			fatalErrorHandler);
    +	}
    +
    +	@Override
    +	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
    +		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
    --- End diff --
    
    This is indeed still up to discussions how we do it in the container world. I could imagine different approaches: Adding it to the container image (if possible), retrieve the `JobGraph` from a service (HTTP server) or a network mounted disk. For the moment, this should not affect this PR, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4281: [FLINK-7108] [yarn] Add YARN entry points based on the ge...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4281
  
    Thanks for your review @EronWright. I will rebase this PR to see whether it passes Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r128129505
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.SecurityOptions;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.Utils;
    +import org.apache.flink.yarn.YarnConfigKeys;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.slf4j.Logger;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
    + * {@link YarnJobClusterEntrypoint}.
    + */
    +public class YarnEntrypointUtils {
    +
    +	public static SecurityContext installSecurityContext(
    +			Configuration configuration,
    +			String workingDirectory) throws Exception {
    +		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
    +
    +		//To support Yarn Secure Integration Test Scenario
    --- End diff --
    
    I think this was a workaround for an issue with the Kerberos variant of the Yarn integration test.    There must be a simpler way; it seems wrong to have test code in the production codepath.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4281: [FLINK-7108] [yarn] Add YARN entry points based on...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4281#discussion_r130037857
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn.entrypoint;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.SecurityOptions;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.yarn.Utils;
    +import org.apache.flink.yarn.YarnConfigKeys;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.slf4j.Logger;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and
    + * {@link YarnJobClusterEntrypoint}.
    + */
    +public class YarnEntrypointUtils {
    +
    +	public static SecurityContext installSecurityContext(
    +			Configuration configuration,
    +			String workingDirectory) throws Exception {
    +		org.apache.hadoop.conf.Configuration hadoopConfiguration = null;
    +
    +		//To support Yarn Secure Integration Test Scenario
    +		File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
    +		if (krb5Conf.exists() && krb5Conf.canRead()) {
    +			hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
    +			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
    +			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
    +		}
    +
    +		SecurityUtils.SecurityConfiguration sc;
    +		if (hadoopConfiguration != null) {
    +			sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
    +		} else {
    +			sc = new SecurityUtils.SecurityConfiguration(configuration);
    +		}
    +
    +		SecurityUtils.install(sc);
    +
    +		return SecurityUtils.getInstalledContext();
    +	}
    +
    +	public static Configuration loadConfiguration(String workingDirectory, Map<String, String> env) {
    --- End diff --
    
    Yes, this should be refactored. We should write the dynamic properties to the configuration file and make it available to the started YARN container. It's mainly due to historical reasons that we still send the dynamic properties encoded in an environment variable. Since this is an orthogonal issue, I would like to address this with a different PR. It is also related to: https://issues.apache.org/jira/browse/FLINK-7269


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---