You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/06/29 12:31:09 UTC

flink git commit: [FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback

Repository: flink
Updated Branches:
  refs/heads/master 2c13e00c1 -> 7ea4e298a


[FLINK-8067] User code ClassLoader not set before calling ProcessingTimeCallback

This closes #6081


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ea4e298
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ea4e298
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ea4e298

Branch: refs/heads/master
Commit: 7ea4e298aeec8548fb7a3c1e6e259440bcf65993
Parents: 2c13e00
Author: yanghua <ya...@gmail.com>
Authored: Sat May 26 18:44:21 2018 +0800
Committer: Piotr Nowojski <pi...@gmail.com>
Committed: Fri Jun 29 14:30:53 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 87 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ea4e298/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 629dcd9..41257ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -258,8 +258,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 			// if the clock is not already set, then assign a default TimeServiceProvider
 			if (timerService == null) {
-				ThreadFactory timerThreadFactory =
-					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
+				ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
+					"Time Trigger for " + getName(), getUserCodeClassLoader());
 
 				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ea4e298/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1a4ce31..6a9dda4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -133,6 +133,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -142,8 +143,13 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -806,6 +812,36 @@ public class StreamTaskTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test set user code ClassLoader before calling ProcessingTimeCallback.
+	 */
+	@Test
+	public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
+		syncLatch = new OneShotLatch();
+
+		try (MockEnvironment mockEnvironment =
+			new MockEnvironmentBuilder()
+				.setUserCodeClassLoader(new TestUserCodeClassLoader())
+				.build()) {
+			TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment);
+
+			CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+				() -> {
+					try {
+						timerServiceTask.invoke();
+					} catch (Exception e) {
+						throw new CompletionException(e);
+					}
+				},
+				TestingUtils.defaultExecutor());
+
+			invokeFuture.get();
+
+			assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
+			assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------
@@ -1239,6 +1275,57 @@ public class StreamTaskTest extends TestLogger {
 
 	}
 
+	/**
+	 * A task that register a processing time service callback.
+	 */
+	public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+		private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
+
+		public TimeServiceTask(Environment env) {
+			super(env, null);
+		}
+
+		public List<ClassLoader> getClassLoaders() {
+			return classLoaders;
+		}
+
+		@Override
+		protected void init() throws Exception {
+			getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+					classLoaders.add(Thread.currentThread().getContextClassLoader());
+					syncLatch.trigger();
+				}
+			});
+		}
+
+		@Override
+		protected void run() throws Exception {
+			syncLatch.await();
+		}
+
+		@Override
+		protected void cleanup() throws Exception {
+
+		}
+
+		@Override
+		protected void cancelTask() throws Exception {
+
+		}
+	}
+
+	/**
+	 * A {@link ClassLoader} that delegates everything to {@link ClassLoader#getSystemClassLoader()}.
+	 */
+	private static class TestUserCodeClassLoader extends ClassLoader {
+		public TestUserCodeClassLoader() {
+			super(ClassLoader.getSystemClassLoader());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// ------------------------------------------------------------------------