You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/02 08:21:00 UTC

[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

    [ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460696#comment-16460696 ] 

ASF GitHub Bot commented on FLINK-8286:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185422695
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    +			return null;
    +		}
     
    -			// configure local directory
    -			if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -				LOG.info("Overriding YARN's temporary file directories with those " +
    -					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    -			}
    -			else {
    -				LOG.info("Setting directories for temporary files to: {}", localDirs);
    -				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    -			}
    -
    -			// tell akka to die in case of an error
    -			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +		// configure local directory
    +		if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +			LOG.info("Overriding YARN's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    +		}
    +		else {
    +			LOG.info("Setting directories for temporary files to: {}", localDirs);
    +			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    +		}
     
    -			String keytabPath = null;
    -			if (remoteKeytabPath != null) {
    -				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    -				keytabPath = f.getAbsolutePath();
    -				LOG.info("keytab path: {}", keytabPath);
    -			}
    +		// tell akka to die in case of an error
    +		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
     
    +		try {
     			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     
     			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
     					currentUser.getShortUserName(), yarnClientUsername);
     
    +			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    I see, could you maybe split this PR in two commits then? One that does the refactoring and one that does the actual fix. This way, it's clearer what exactly the fix is.


> Fix Flink-Yarn-Kerberos integration for FLIP-6
> ----------------------------------------------
>
>                 Key: FLINK-8286
>                 URL: https://issues.apache.org/jira/browse/FLINK-8286
>             Project: Flink
>          Issue Type: Bug
>          Components: Security
>            Reporter: Shuyi Chen
>            Assignee: Shuyi Chen
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)