You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/06/29 12:31:09 UTC
flink git commit: [FLINK-8067] User code ClassLoader not set before
calling ProcessingTimeCallback
Repository: flink
Updated Branches:
refs/heads/master 2c13e00c1 -> 7ea4e298a
[FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback
This closes #6081
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ea4e298
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ea4e298
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ea4e298
Branch: refs/heads/master
Commit: 7ea4e298aeec8548fb7a3c1e6e259440bcf65993
Parents: 2c13e00
Author: yanghua <ya...@gmail.com>
Authored: Sat May 26 18:44:21 2018 +0800
Committer: Piotr Nowojski <pi...@gmail.com>
Committed: Fri Jun 29 14:30:53 2018 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 87 ++++++++++++++++++++
2 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7ea4e298/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 629dcd9..41257ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -258,8 +258,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
- ThreadFactory timerThreadFactory =
- new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
+ ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
+ "Time Trigger for " + getName(), getUserCodeClassLoader());
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ea4e298/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1a4ce31..6a9dda4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -133,6 +133,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -142,8 +143,13 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -806,6 +812,36 @@ public class StreamTaskTest extends TestLogger {
}
}
+ /**
+ * Test set user code ClassLoader before calling ProcessingTimeCallback.
+ */
+ @Test
+ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+ syncLatch = new OneShotLatch();
+
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setUserCodeClassLoader(new TestUserCodeClassLoader())
+ .build()) {
+ TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
+
+ CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+ () -> {
+ try {
+ timerServiceTask.invoke();
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ },
+ TestingUtils.defaultExecutor());
+
+ invokeFuture.get();
+
+ assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
+ assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
+ }
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
@@ -1239,6 +1275,57 @@ public class StreamTaskTest extends TestLogger {
}
+ /**
+ * A task that register a processing time service callback.
+ */
+ public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
+
+ public TimeServiceTask(Environment env) {
+ super(env, null);
+ }
+
+ public List<ClassLoader> getClassLoaders() {
+ return classLoaders;
+ }
+
+ @Override
+ protected void init() throws Exception {
+ getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ classLoaders.add(Thread.currentThread().getContextClassLoader());
+ syncLatch.trigger();
+ }
+ });
+ }
+
+ @Override
+ protected void run() throws Exception {
+ syncLatch.await();
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+
+ }
+
+ @Override
+ protected void cancelTask() throws Exception {
+
+ }
+ }
+
+ /**
+ * A {@link ClassLoader} that delegates everything to {@link ClassLoader#getSystemClassLoader()}.
+ */
+ private static class TestUserCodeClassLoader extends ClassLoader {
+ public TestUserCodeClassLoader() {
+ super(ClassLoader.getSystemClassLoader());
+ }
+ }
+
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------