You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/05/26 10:47:39 UTC
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
GitHub user yanghua opened a pull request:
https://github.com/apache/flink/pull/6081
[FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback
## What is the purpose of the change
*This pull request sets the user code class loader for the thread factory*
## Brief change log
- *Sets the user code class loader for the thread factory*
## Verifying this change
This change is already covered by existing tests*.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yanghua/flink FLINK-8067
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6081.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6081
----
commit 2f11cabedc62ceded05a2ef02b88abfc79a77ba1
Author: yanghua <ya...@...>
Date: 2018-05-26T10:44:21Z
[FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback
----
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6081
cc @pnowojski does this PR look good to you?
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6081
cc @pnowojski please review this, thanks~
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6081
@pnowojski test error has fixed~
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/6081
Thanks for the contribution! Merged.
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195012204
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -1239,6 +1286,50 @@ protected void cancelTask() throws Exception {
}
+ /**
+ * A task that register a processing time service callback.
+ */
+ public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ public TimeServiceTask(Environment env) {
+ super(env, null);
+ }
+
+ @Override
+ protected void init() throws Exception {
+ getProcessingTimeService().registerTimer(1000, new ProcessingTimeCallback() {
--- End diff --
Please change `1000` to `0`. This parameter is not a delay in miliseconds/seconds from now, but specifies exact timestamp when the timer should fire, thus value `1000` is misleading.
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/6081
👍 Yes, I have already approved it on github:
> pnowojski approved these changes 23 hours ago
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195123173
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
+
+ final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+ CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ atomicThrowable.set(e);
+ }
+ },
+ TestingUtils.defaultExecutor());
+
+ // wait until the invoke is complete
+ invokeFuture.get();
+
+ assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
+ assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+ // check if an exception occurred
--- End diff --
tiny nits: you could drop this (and the one above) comment. The code is pretty self explanatory on it's own.
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/6081
@yanghua why is it difficult? Can not you create more or less similar test to @GJL's `testSetsUserCodeClassLoader` from here: https://github.com/apache/flink/pull/4980/files. With couple side notes:
1. build `MockEnvironment` using `MockEnvironmentBuilder` with a custom user class loader - do not use mockito for that
2. create a stream task (reusing one of the implementation in `StreamTaskTest.java` or create a new one)
3. register a timer service, wait for it's triggering and assert the user class loader in `ProcessingTimeCallback#onProcessingTime`
---
[GitHub] flink issue #6081: [FLINK-8067] User code ClassLoader not set before calling...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6081
@pnowojski I've tried but it seams it's hard to do this. The class loader can not be accessed out of the class. @GJL any suggestion? Or need to test?
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195007239
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -806,6 +819,40 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
--- End diff --
Maybe reformat to:
```
try (MockEnvironment mockEnvironment =
new MockEnvironmentBuilder()
.setUserCodeClassLoader(new TestUserCodeClassLoader())
.build()) {
}
```
?
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195007487
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -806,6 +819,40 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new TestUserCodeClassLoader()).build()) {
+ TimeServiceTask timerServiceTask = spy(new TimeServiceTask(mockEnvironment));
--- End diff --
why do you need `spy` here?
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195011678
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -1239,6 +1286,50 @@ protected void cancelTask() throws Exception {
}
+ /**
+ * A task that register a processing time service callback.
+ */
+ public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ public TimeServiceTask(Environment env) {
+ super(env, null);
+ }
+
+ @Override
+ protected void init() throws Exception {
+ getProcessingTimeService().registerTimer(1000, new ProcessingTimeCallback() {
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ classLoaders.add(Thread.currentThread().getContextClassLoader());
+ }
+ });
+ }
+
+ @Override
+ protected void run() throws Exception {
--- End diff --
The test is failing on the travis. Probably because here in `run()` you do not wait for at least one execution of `onProcessingTime(...)`. Probably you need to add a `OneShotLatch` and `tigger` it in `onProcessingTime` while `await`'ing in `run()`.
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195125463
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
+
+ final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+ CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ atomicThrowable.set(e);
--- End diff --
you can fail this with a `CompletionException` instead, then we don't need the atomic reference and will fail at `invokeFuture.get`
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195013346
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -169,6 +175,13 @@
private static OneShotLatch syncLatch;
+ private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
--- End diff --
maybe move this field to `TimeServiceTask` since it's tightly connected to it.
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6081#discussion_r195124501
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -806,6 +811,44 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
+
+ final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+ CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ atomicThrowable.set(e);
+ }
+ },
+ TestingUtils.defaultExecutor());
+
+ // wait until the invoke is complete
+ invokeFuture.get();
+
+ assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
+ assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
+
+ // check if an exception occurred
--- End diff --
hold on, will fix it~
---
[GitHub] flink pull request #6081: [FLINK-8067] User code ClassLoader not set before ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6081
---