You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/05/26 10:47:39 UTC

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

GitHub user yanghua opened a pull request:

    https://github.com/apache/flink/pull/6081

    [FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback

    ## What is the purpose of the change
    
    *This pull request sets the user code class loader for the thread factory*
    
    
    ## Brief change log
    
      - *Sets the user code class loader for the thread factory*
    
    ## Verifying this change
    
    This change is already covered by existing tests*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yanghua/flink FLINK-8067

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6081.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6081
    
----
commit 2f11cabedc62ceded05a2ef02b88abfc79a77ba1
Author: yanghua <ya...@...>
Date:   2018-05-26T10:44:21Z

    [FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback

----


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    cc @pnowojski does this PR look good to you?


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    cc @pnowojski please review this, thanks~


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    @pnowojski  test error has fixed~


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    Thanks for the contribution! Merged.


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195012204
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -1239,6 +1286,50 @@ protected void cancelTask() throws Exception {
     
     	}
     
    +	/**
    +	 * A task that register a processing time service callback.
    +	 */
    +	public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
    +
    +		public TimeServiceTask(Environment env) {
    +			super(env, null);
    +		}
    +
    +		@Override
    +		protected void init() throws Exception {
    +			getProcessingTimeService().registerTimer(1000, new ProcessingTimeCallback() {
    --- End diff --
    
    Please change `1000` to `0`. This parameter is not a delay in miliseconds/seconds from now, but specifies exact timestamp when the timer should fire, thus value `1000` is misleading.


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    👍 Yes, I have already approved it on github:
    > pnowojski approved these changes 23 hours ago


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195123173
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
     		}
     	}
     
    +	/**
    +	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
    +	 */
    +	@Test
    +	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
    +		syncLatch = new OneShotLatch();
    +
    +		try (MockEnvironment mockEnvironment =
    +			new MockEnvironmentBuilder()
    +				.setUserCodeClassLoader(new TestUserCodeClassLoader())
    +				.build()) {
    +			TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
    +
    +			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
    +
    +			CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
    +				() -> {
    +					try {
    +						timerServiceTask.invoke();
    +					} catch (Exception e) {
    +						atomicThrowable.set(e);
    +					}
    +				},
    +				TestingUtils.defaultExecutor());
    +
    +			// wait until the invoke is complete
    +			invokeFuture.get();
    +
    +			assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
    +			assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
    +
    +			// check if an exception occurred
    --- End diff --
    
    tiny nits: you could drop this (and the one above) comment. The code is pretty self explanatory on it's own.


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    @yanghua why is it difficult? Can not you create more or less similar test to @GJL's `testSetsUserCodeClassLoader` from here: https://github.com/apache/flink/pull/4980/files. With couple side notes:
    
    1. build `MockEnvironment` using `MockEnvironmentBuilder` with a custom user class loader - do not use mockito for that
    2. create a stream task (reusing one of the implementation in `StreamTaskTest.java` or create a new one)
    3. register a timer service, wait for it's triggering and assert the user class loader in `ProcessingTimeCallback#onProcessingTime`


---

[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6081
  
    @pnowojski I've tried but it seams it's hard to do this. The class loader can not be accessed out of the class. @GJL any suggestion? Or need to test?


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195007239
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -806,6 +819,40 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
     		}
     	}
     
    +	/**
    +	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
    +	 */
    +	@Test
    +	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
    +		try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
    --- End diff --
    
    Maybe reformat to:
    ```
    try (MockEnvironment mockEnvironment = 
    		 new MockEnvironmentBuilder()
    			 .setUserCodeClassLoader(new TestUserCodeClassLoader())
    			 .build()) {
    	
    }
    ```
    ?


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195007487
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -806,6 +819,40 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
     		}
     	}
     
    +	/**
    +	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
    +	 */
    +	@Test
    +	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
    +		try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
    +			.setUserCodeClassLoader(new TestUserCodeClassLoader()).build()) {
    +			TimeServiceTask timerServiceTask = spy(new TimeServiceTask(mockEnvironment));
    --- End diff --
    
    why do you need `spy` here?


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195011678
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -1239,6 +1286,50 @@ protected void cancelTask() throws Exception {
     
     	}
     
    +	/**
    +	 * A task that register a processing time service callback.
    +	 */
    +	public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
    +
    +		public TimeServiceTask(Environment env) {
    +			super(env, null);
    +		}
    +
    +		@Override
    +		protected void init() throws Exception {
    +			getProcessingTimeService().registerTimer(1000, new ProcessingTimeCallback() {
    +				@Override
    +				public void onProcessingTime(long timestamp) throws Exception {
    +					classLoaders.add(Thread.currentThread().getContextClassLoader());
    +				}
    +			});
    +		}
    +
    +		@Override
    +		protected void run() throws Exception {
    --- End diff --
    
    The test is failing on the travis. Probably because here in `run()` you do not wait for at least one execution of `onProcessingTime(...)`. Probably you need to add a `OneShotLatch` and `tigger` it in `onProcessingTime` while `await`'ing in `run()`.


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195125463
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
     		}
     	}
     
    +	/**
    +	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
    +	 */
    +	@Test
    +	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
    +		syncLatch = new OneShotLatch();
    +
    +		try (MockEnvironment mockEnvironment =
    +			new MockEnvironmentBuilder()
    +				.setUserCodeClassLoader(new TestUserCodeClassLoader())
    +				.build()) {
    +			TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
    +
    +			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
    +
    +			CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
    +				() -> {
    +					try {
    +						timerServiceTask.invoke();
    +					} catch (Exception e) {
    +						atomicThrowable.set(e);
    --- End diff --
    
    you can fail this with a `CompletionException` instead, then we don't need the atomic reference and will fail at `invokeFuture.get`


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195013346
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -169,6 +175,13 @@
     
     	private static OneShotLatch syncLatch;
     
    +	private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
    --- End diff --
    
    maybe move this field to `TimeServiceTask` since it's tightly connected to it.


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6081#discussion_r195124501
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
     		}
     	}
     
    +	/**
    +	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
    +	 */
    +	@Test
    +	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
    +		syncLatch = new OneShotLatch();
    +
    +		try (MockEnvironment mockEnvironment =
    +			new MockEnvironmentBuilder()
    +				.setUserCodeClassLoader(new TestUserCodeClassLoader())
    +				.build()) {
    +			TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
    +
    +			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
    +
    +			CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
    +				() -> {
    +					try {
    +						timerServiceTask.invoke();
    +					} catch (Exception e) {
    +						atomicThrowable.set(e);
    +					}
    +				},
    +				TestingUtils.defaultExecutor());
    +
    +			// wait until the invoke is complete
    +			invokeFuture.get();
    +
    +			assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
    +			assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
    +
    +			// check if an exception occurred
    --- End diff --
    
    hold on, will fix it~


---

[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6081


---