You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2018/05/03 15:35:13 UTC

[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/5953

    [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos integration for flip-6

    Alternative version of #5901

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-9235-flip-6-yarn-secured-test

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5953.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5953
    
----
commit 9ca7852fe1a6f1a86d05ba2f7851199e43411579
Author: Aljoscha Krettek <al...@...>
Date:   2018-05-03T14:23:29Z

    Remove special-case krb5.conf code from YARN runners

commit b4fb889c911ccef0521cd9ed8839190a12f2d4a5
Author: Aljoscha Krettek <al...@...>
Date:   2018-05-03T14:27:40Z

    [FLINK-9235] Test new FLIP-6 code in YARNSessionFIFOSecuredITCase
    
    Before, always setting mode to LEGACY_MODE when security settings are
    present caused the test never to run with the new code.
    
    For this, we also need to actually execute an example. Otherwise, no
    TaskExecutors would be brought up.

----


---

[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/5953


---

[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5953#discussion_r186350483
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---
    @@ -85,21 +91,56 @@ public void checkForProhibitedLogContents() {
     	public void testDetachedMode() throws InterruptedException, IOException {
     		LOG.info("Starting testDetachedMode()");
     		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
    -		Runner runner =
    -			startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
    -						"-t", flinkLibFolder.getAbsolutePath(),
    -						"-n", "1",
    -						"-jm", "768",
    -						"-tm", "1024",
    -						"--name", "MyCustomName", // test setting a custom name
    -						"--detached"},
    +
    +		File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
    +		// get temporary file for reading input data for wordcount example
    +		File tmpInFile = tmp.newFile();
    +		FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
    +
    +		ArrayList<String> args = new ArrayList<>();
    +		args.add("-j"); args.add(flinkUberjar.getAbsolutePath());
    +		args.add("-t"); args.add(flinkLibFolder.getAbsolutePath());
    +		args.add("-n"); args.add("1");
    +		args.add("-jm"); args.add("768");
    +		args.add("-tm"); args.add("1024");
    +		if (SecureTestEnvironment.getTestKeytab() != null) {
    +			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + "=" + SecureTestEnvironment.getTestKeytab());
    +		}
    +		if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
    +			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + "=" + SecureTestEnvironment.getHadoopServicePrincipal());
    +		}
    +		args.add("--name"); args.add("MyCustomName");
    +		args.add("--detached");
    +		Runner clusterRunner =
    +			startWithArgs(
    +				args.toArray(new String[args.size()]),
     				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
     
     		// before checking any strings outputted by the CLI, first give it time to return
    -		runner.join();
    -		checkForLogString("The Flink YARN client has been started in detached mode");
    +		clusterRunner.join();
     
     		if (!isNewMode) {
    +			checkForLogString("The Flink YARN client has been started in detached mode");
    +
    +			// in legacy mode we have to wait until the TMs are up until we can submit the job
    +			LOG.info("Waiting until two containers are running");
    +			// wait until two containers are running
    +			while (getRunningContainers() < 2) {
    --- End diff --
    
    is it guaranteed that if the container runs it is already registered with the JobManager?


---

[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5953#discussion_r186343907
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---
    @@ -85,21 +91,56 @@ public void checkForProhibitedLogContents() {
     	public void testDetachedMode() throws InterruptedException, IOException {
     		LOG.info("Starting testDetachedMode()");
     		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
    -		Runner runner =
    -			startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
    -						"-t", flinkLibFolder.getAbsolutePath(),
    -						"-n", "1",
    -						"-jm", "768",
    -						"-tm", "1024",
    -						"--name", "MyCustomName", // test setting a custom name
    -						"--detached"},
    +
    +		File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
    +		// get temporary file for reading input data for wordcount example
    +		File tmpInFile = tmp.newFile();
    +		FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
    +
    +		ArrayList<String> args = new ArrayList<>();
    +		args.add("-j"); args.add(flinkUberjar.getAbsolutePath());
    --- End diff --
    
    😄  I see what you're trying to do here, but I'm fairly certain this will get auto-formatted at some point.
    
    I would do it like this:
    ```
    ArrayList<String> args = new ArrayList<>();
    args.add("-j");
    args.add(flinkUberjar.getAbsolutePath());
    
    args.add("-t");
    args.add(flinkLibFolder.getAbsolutePath());
    
    ...
    ```


---

[GitHub] flink issue #5953: [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5953
  
    hey, it's working now 😃 
    
    cc @zentol 


---

[GitHub] flink issue #5953: [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5953
  
    R: @zentol @suez1224 
    
    At least the secured ITCase is currently failing for legacy mode. Investigating...


---

[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5953#discussion_r186350834
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---
    @@ -85,21 +91,56 @@ public void checkForProhibitedLogContents() {
     	public void testDetachedMode() throws InterruptedException, IOException {
     		LOG.info("Starting testDetachedMode()");
     		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
    -		Runner runner =
    -			startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
    -						"-t", flinkLibFolder.getAbsolutePath(),
    -						"-n", "1",
    -						"-jm", "768",
    -						"-tm", "1024",
    -						"--name", "MyCustomName", // test setting a custom name
    -						"--detached"},
    +
    +		File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
    +		// get temporary file for reading input data for wordcount example
    +		File tmpInFile = tmp.newFile();
    +		FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
    +
    +		ArrayList<String> args = new ArrayList<>();
    +		args.add("-j"); args.add(flinkUberjar.getAbsolutePath());
    +		args.add("-t"); args.add(flinkLibFolder.getAbsolutePath());
    +		args.add("-n"); args.add("1");
    +		args.add("-jm"); args.add("768");
    +		args.add("-tm"); args.add("1024");
    +		if (SecureTestEnvironment.getTestKeytab() != null) {
    +			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + "=" + SecureTestEnvironment.getTestKeytab());
    +		}
    +		if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
    +			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + "=" + SecureTestEnvironment.getHadoopServicePrincipal());
    +		}
    +		args.add("--name"); args.add("MyCustomName");
    +		args.add("--detached");
    +		Runner clusterRunner =
    +			startWithArgs(
    +				args.toArray(new String[args.size()]),
     				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
     
     		// before checking any strings outputted by the CLI, first give it time to return
    -		runner.join();
    -		checkForLogString("The Flink YARN client has been started in detached mode");
    +		clusterRunner.join();
     
     		if (!isNewMode) {
    +			checkForLogString("The Flink YARN client has been started in detached mode");
    +
    +			// in legacy mode we have to wait until the TMs are up until we can submit the job
    +			LOG.info("Waiting until two containers are running");
    +			// wait until two containers are running
    +			while (getRunningContainers() < 2) {
    --- End diff --
    
    there a comment at L 151: `// additional sleep for the JM/TM to start and establish connection`, the job execution should probably happen after that...


---