You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2018/04/23 07:02:48 UTC

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

GitHub user suez1224 opened a pull request:

    https://github.com/apache/flink/pull/5896

    [FLINK-8286][Security] Fix kerberos security configuration for YarnTaskExecutor

    ## What is the purpose of the change
    
    Fix broken YARN kerberos integration for flip-6.
    
    
    ## Brief change log
    
      - Fix kerberos onfigurations.
      - Refactor code.
      - Add unittest.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
      - *Added test that validates kerberos credentials are set correctly *
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suez1224/flink FLINK-8286

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5896
    
----
commit dfeb614c5d5ecba21390c4ee7ceefdefa3a48bf1
Author: Shuyi Chen <sh...@...>
Date:   2018-04-23T00:35:37Z

    Fix kerberos security configuration for YarnTaskExecutor

----


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185050544
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    --- End diff --
    
    Why is this exception being swallowed?


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185134529
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    --- End diff --
    
    Good point. Added exceptions to method signature and let caller handle it.


---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    @aljoscha that's a very good catch. I added another commit to make sure the call order is always correct (https://github.com/apache/flink/pull/5896/commits/1298b04ea80b9160ac7e4ae8a171f80112a95945). Could you please let me know what you think? I think we should have the unittest in place since this is the second time it breaks. I can create another PR for the refactoring commit.


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185135031
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    +			return null;
    +		}
     
    -			// configure local directory
    -			if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -				LOG.info("Overriding YARN's temporary file directories with those " +
    -					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    -			}
    -			else {
    -				LOG.info("Setting directories for temporary files to: {}", localDirs);
    -				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    -			}
    -
    -			// tell akka to die in case of an error
    -			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +		// configure local directory
    +		if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +			LOG.info("Overriding YARN's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    +		}
    +		else {
    +			LOG.info("Setting directories for temporary files to: {}", localDirs);
    +			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    +		}
     
    -			String keytabPath = null;
    -			if (remoteKeytabPath != null) {
    -				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    -				keytabPath = f.getAbsolutePath();
    -				LOG.info("keytab path: {}", keytabPath);
    -			}
    +		// tell akka to die in case of an error
    +		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
     
    +		try {
     			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     
     			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
     					currentUser.getShortUserName(), yarnClientUsername);
     
    +			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    No, this is a refactoring becaust that part of code is kinda redundant. 
    
    The real change is moving this code block below before the call to "new SecurityConfiguration()":
    
    configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
    configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185422695
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    +			return null;
    +		}
     
    -			// configure local directory
    -			if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -				LOG.info("Overriding YARN's temporary file directories with those " +
    -					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    -			}
    -			else {
    -				LOG.info("Setting directories for temporary files to: {}", localDirs);
    -				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    -			}
    -
    -			// tell akka to die in case of an error
    -			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +		// configure local directory
    +		if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +			LOG.info("Overriding YARN's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    +		}
    +		else {
    +			LOG.info("Setting directories for temporary files to: {}", localDirs);
    +			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    +		}
     
    -			String keytabPath = null;
    -			if (remoteKeytabPath != null) {
    -				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    -				keytabPath = f.getAbsolutePath();
    -				LOG.info("keytab path: {}", keytabPath);
    -			}
    +		// tell akka to die in case of an error
    +		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
     
    +		try {
     			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     
     			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
     					currentUser.getShortUserName(), yarnClientUsername);
     
    +			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    I see, could you maybe split this PR in two commits then? One that does the refactoring and one that does the actual fix. This way, it's clearer what exactly the fix is.


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185606093
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    +			return null;
    +		}
     
    -			// configure local directory
    -			if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -				LOG.info("Overriding YARN's temporary file directories with those " +
    -					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    -			}
    -			else {
    -				LOG.info("Setting directories for temporary files to: {}", localDirs);
    -				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    -			}
    -
    -			// tell akka to die in case of an error
    -			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +		// configure local directory
    +		if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +			LOG.info("Overriding YARN's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    +		}
    +		else {
    +			LOG.info("Setting directories for temporary files to: {}", localDirs);
    +			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    +		}
     
    -			String keytabPath = null;
    -			if (remoteKeytabPath != null) {
    -				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    -				keytabPath = f.getAbsolutePath();
    -				LOG.info("keytab path: {}", keytabPath);
    -			}
    +		// tell akka to die in case of an error
    +		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
     
    +		try {
     			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     
     			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
     					currentUser.getShortUserName(), yarnClientUsername);
     
    +			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    @aljoscha done, please take another look. Thanks.


---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    @suez1224 I'll now merge the actual fix, but I'm not 100 % sure the refactoring is correct.
    
    After the fix, we have roughly this path through the code:
    ```
    if (keytabPath != null && remoteKeytabPrincipal != null) {
    	configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
    	configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
    }
    
    SecurityConfiguration sc = new SecurityConfiguration(configuration);
    
    SecurityUtils.install(sc);
    
    SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
    	@Override
    	public Void call() throws Exception {
    		TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
    		return null;
    	}
    });
    ```
    
    after the fix, that becomes
    
    ```
    // in main()
    SecurityUtils.getInstalledContext().runSecured(
       YarnTaskExecutorRunnerFactory.create(System.getenv()));
    
    // in YarnTaskExecutorRunnerFactory.create()
    if (keytabPath != null && remoteKeytabPrincipal != null) {
    	configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
    	configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
    }
    
    SecurityConfiguration sc = new SecurityConfiguration(configuration);
    
    SecurityUtils.install(sc);
    
    return new Runner()
    ```
    
    Meaning, that if someone messes with how things are called it can happen that `SecurityUtils.getInstalledContext()` is called before `SecurityUtils.install(sc)` is called in `YarnTaskExecutorRunnerFactory.create`. I think this can potentially lead to problems and the old path is much clearer. What do you think?



---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    This PR seems to obfuscate the fix.  The issue was with the interpretation of the keytab path, right?  But the bulk of the change was to clarify the ordering of context installation vs use?


---

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185052704
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
     		SignalHandler.register(LOG);
     		JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -		run(args);
    +		try {
    +			SecurityUtils.getInstalledContext().runSecured(
    +					YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +		} catch (Exception e) {
    +			LOG.error("Exception occurred while launching Task Executor runner", e);
    +			throw new RuntimeException(e);
    +		}
     	}
     
     	/**
    -	 * The instance entry point for the YARN task executor. Obtains user group information and calls
    -	 * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -	 * privileged action.
    +	 * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
     	 *
    -	 * @param args The command line arguments.
    +	 * @param envs environment variables.
     	 */
    -	private static void run(String[] args) {
    -		try {
    -			LOG.debug("All environment variables: {}", ENV);
    +	@VisibleForTesting
    +	protected static Runner create(Map<String, String> envs) {
    +		LOG.debug("All environment variables: {}", envs);
     
    -			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
    -			LOG.info("Current working/local Directory: {}", localDirs);
    +		final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +		LOG.info("Current working/local Directory: {}", localDirs);
     
    -			final String currDir = ENV.get(Environment.PWD.key());
    -			LOG.info("Current working Directory: {}", currDir);
    +		final String currDir = envs.get(Environment.PWD.key());
    +		LOG.info("Current working Directory: {}", currDir);
     
    -			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
    +		final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
     
    -			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
    -
    -			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
    +		final Configuration configuration;
    +		try {
    +			configuration = GlobalConfiguration.loadConfiguration(currDir);
     			FileSystem.initialize(configuration);
    +		} catch (Throwable t) {
    +			LOG.error(t.getMessage(), t);
    +			return null;
    +		}
     
    -			// configure local directory
    -			if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -				LOG.info("Overriding YARN's temporary file directories with those " +
    -					"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    -			}
    -			else {
    -				LOG.info("Setting directories for temporary files to: {}", localDirs);
    -				configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    -			}
    -
    -			// tell akka to die in case of an error
    -			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +		// configure local directory
    +		if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +			LOG.info("Overriding YARN's temporary file directories with those " +
    +				"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
    +		}
    +		else {
    +			LOG.info("Setting directories for temporary files to: {}", localDirs);
    +			configuration.setString(CoreOptions.TMP_DIRS, localDirs);
    +		}
     
    -			String keytabPath = null;
    -			if (remoteKeytabPath != null) {
    -				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    -				keytabPath = f.getAbsolutePath();
    -				LOG.info("keytab path: {}", keytabPath);
    -			}
    +		// tell akka to die in case of an error
    +		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
     
    +		try {
     			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     
     			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
     					currentUser.getShortUserName(), yarnClientUsername);
     
    +			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    Is the only change really that we always do this instead of having the check on `remoteKeytabPath`, as the old code had?
    
    The old code had this on line 120:
    ```
    if (remoteKeytabPath != null) {
    	File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    	keytabPath = f.getAbsolutePath();
    	LOG.info("keytab path: {}", keytabPath);
    }
    ```


---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    @EronWright the fix is really one this: https://github.com/apache/flink/commit/ba3e2711a2091b7c0907a5caf3ea527f837a442f But I haven't yet managed to reproduce a failure on my system without the fix. Still working on it.


---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    @suez1224 Did you rebase on master? I think that krb5-specific could should not be there anymore.


---

[GitHub] flink issue #5896: [FLINK-8286][Security] Fix kerberos security configuratio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5896
  
    Merged, could you please close the PR after discussion of the refactoring?


---