You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:26 UTC

[6/9] flink git commit: [FLINK-8704][tests] Port ClassLoaderITCase to flip6

[FLINK-8704][tests] Port ClassLoaderITCase to flip6

This closes #5780.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/674a2d34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/674a2d34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/674a2d34

Branch: refs/heads/master
Commit: 674a2d3464cc5628961f3b9628713b9515ba6c6f
Parents: 420190d
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:49:48 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../test/classloading/ClassLoaderITCase.java    | 159 ++++----
 .../classloading/LegacyClassLoaderITCase.java   | 399 +++++++++++++++++++
 2 files changed, 472 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index b357904..089ade4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -19,35 +19,35 @@
 package org.apache.flink.test.classloading;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -56,25 +56,22 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasProperty;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Test job classloader.
  */
+@Category(New.class)
 public class ClassLoaderITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
@@ -95,22 +92,21 @@ public class ClassLoaderITCase extends TestLogger {
 
 	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
 
-	@ClassRule
-	public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+	private static final TemporaryFolder FOLDER = new TemporaryFolder();
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
-	private static TestingCluster testCluster;
+	private static MiniCluster testCluster;
 
-	private static int parallelism;
+	private static final int parallelism = 4;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
+		FOLDER.create();
+
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		parallelism = 4;
 
 		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
 		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
@@ -121,16 +117,29 @@ public class ClassLoaderITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
-		testCluster = new TestingCluster(config, false);
+		// required as we otherwise run out of memory
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80);
+
+		testCluster = new MiniCluster(
+			new MiniClusterConfiguration.Builder()
+				.setNumTaskManagers(2)
+				.setNumSlotsPerTaskManager(2)
+				.setConfiguration(config)
+			.build()
+		);
 		testCluster.start();
 	}
 
 	@AfterClass
-	public static void tearDown() throws Exception {
+	public static void tearDownClass() throws Exception {
 		if (testCluster != null) {
-			testCluster.stop();
+			testCluster.close();
 		}
+		FOLDER.delete();
+	}
 
+	@After
+	public void tearDown() throws Exception {
 		TestStreamEnvironment.unsetAsContext();
 		TestEnvironment.unsetAsContext();
 	}
@@ -202,15 +211,27 @@ public class ClassLoaderITCase extends TestLogger {
 			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
 
-		// Program should terminate with a 'SuccessException':
-		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
-		expectedException.expectCause(
-			Matchers.<Throwable>hasProperty("cause",
-				hasProperty("class",
-					hasProperty("canonicalName", equalTo(
-						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
-
-		streamingCheckpointedProg.invokeInteractiveModeForExecution();
+		try {
+			streamingCheckpointedProg.invokeInteractiveModeForExecution();
+		} catch (Exception e) {
+			// Program should terminate with a 'SuccessException':
+			// the exception class is contained in the user-jar, but is not present on the maven classpath
+			// the deserialization of the exception should thus fail here
+			try {
+				Optional<Throwable> exception = ExceptionUtils.findThrowable(e,
+					candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
+
+				// if we reach this point we either failed due to another exception,
+				// or the deserialization of the user-exception did not fail
+				if (!exception.isPresent()) {
+					throw e;
+				} else {
+					Assert.fail("Deserialization of user exception should have failed.");
+				}
+			} catch (NoClassDefFoundError expected) {
+				// expected
+			}
+		}
 	}
 
 	@Test
@@ -234,15 +255,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 	@Test
 	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		int port = testCluster.getLeaderRPCPort();
-
-		// test FLINK-3633
-		final PackagedProgram userCodeTypeProg = new PackagedProgram(
-			new File(USERCODETYPE_JAR_PATH),
-			new String[] { USERCODETYPE_JAR_PATH,
-				"localhost",
-				String.valueOf(port),
-			});
+		PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH));
 
 		TestEnvironment.setAsContext(
 			testCluster,
@@ -282,6 +295,8 @@ public class ClassLoaderITCase extends TestLogger {
 	 */
 	@Test
 	public void testDisposeSavepointWithCustomKvState() throws Exception {
+		ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), testCluster);
+
 		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
 
 		File checkpointDir = FOLDER.newFolder();
@@ -324,19 +339,18 @@ public class ClassLoaderITCase extends TestLogger {
 		// The job ID
 		JobID jobId = null;
 
-		ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
-
 		LOG.info("Waiting for job status running.");
 
 		// Wait for running job
 		while (jobId == null && deadline.hasTimeLeft()) {
-			Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
-			RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
 
-			for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
-				jobId = runningJob.getJobId();
-				LOG.info("Job running. ID: " + jobId);
-				break;
+			Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			for (JobStatusMessage job : jobs) {
+				if (job.getJobState() == JobStatus.RUNNING) {
+					jobId = job.getJobId();
+					LOG.info("Job running. ID: " + jobId);
+					break;
+				}
 			}
 
 			// Retry if job is not available yet
@@ -345,52 +359,25 @@ public class ClassLoaderITCase extends TestLogger {
 			}
 		}
 
-		LOG.info("Wait for all tasks to be running.");
-		Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
-		Await.ready(allRunning, deadline.timeLeft());
-		LOG.info("All tasks are running.");
-
 		// Trigger savepoint
 		String savepointPath = null;
 		for (int i = 0; i < 20; i++) {
 			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
-			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
-			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
-
-			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
-				savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
-				LOG.info("Triggered savepoint. Path: " + savepointPath);
-			} else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
-				Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+			try {
+				savepointPath = clusterClient.triggerSavepoint(jobId, null)
+					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (Exception cause) {
 				LOG.info("Failed to trigger savepoint. Retrying...", cause);
 				// This can fail if the operators are not opened yet
 				Thread.sleep(500);
-			} else {
-				throw new IllegalStateException("Unexpected response to TriggerSavepoint");
 			}
 		}
 
 		assertNotNull("Failed to trigger savepoint", savepointPath);
 
-		// Dispose savepoint
-		LOG.info("Disposing savepoint at " + savepointPath);
-		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
-		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
-
-		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
-			// Success :-)
-			LOG.info("Disposed savepoint at " + savepointPath);
-		} else if (disposeResponse instanceof DisposeSavepointFailure) {
-			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
-		} else {
-			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
-		}
+		clusterClient.disposeSavepoint(savepointPath).get();
 
-		// Cancel job, wait for success
-		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
-		Object response = Await.result(cancelFuture, deadline.timeLeft());
-		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+		clusterClient.cancel(jobId);
 
 		// make sure, the execution is finished to not influence other test methods
 		invokeThread.join(deadline.timeLeft().toMillis());

http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
new file mode 100644
index 0000000..fa114e1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test job classloader.
+ */
+public class LegacyClassLoaderITCase extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LegacyClassLoaderITCase.class);
+
+	private static final String INPUT_SPLITS_PROG_JAR_FILE = "customsplit-test-jar.jar";
+
+	private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "streaming-customsplit-test-jar.jar";
+
+	private static final String STREAMING_PROG_JAR_FILE = "streamingclassloader-test-jar.jar";
+
+	private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "streaming-checkpointed-classloader-test-jar.jar";
+
+	private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
+
+	private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
+
+	private static final String CUSTOM_KV_STATE_JAR_PATH = "custom_kv_state-test-jar.jar";
+
+	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
+
+	@ClassRule
+	public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	private static TestingCluster testCluster;
+
+	private static int parallelism;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		parallelism = 4;
+
+		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+		// Savepoint path
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
+				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+		testCluster = new TestingCluster(config, false);
+		testCluster.start();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testCluster != null) {
+			testCluster.stop();
+		}
+
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+	}
+
+	@Test
+	public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
+
+		PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		inputSplitTestProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+		URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+		PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.<Path>emptyList(),
+			Collections.singleton(classpath));
+
+		inputSplitTestProg2.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// regular streaming job
+		PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		streamingProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+		// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+		PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		// Program should terminate with a 'SuccessException':
+		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause",
+				hasProperty("class",
+					hasProperty("canonicalName", equalTo(
+						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+		streamingCheckpointedProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram kMeansProg = new PackagedProgram(
+			new File(KMEANS_JAR_PATH),
+			new String[] {
+				KMeansData.DATAPOINTS,
+				KMeansData.INITIAL_CENTERS,
+				"25"
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(KMEANS_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		kMeansProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		int port = testCluster.getLeaderRPCPort();
+
+		// test FLINK-3633
+		final PackagedProgram userCodeTypeProg = new PackagedProgram(
+			new File(USERCODETYPE_JAR_PATH),
+			new String[] { USERCODETYPE_JAR_PATH,
+				"localhost",
+				String.valueOf(port),
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		userCodeTypeProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		File checkpointDir = FOLDER.newFolder();
+		File outputDir = FOLDER.newFolder();
+
+		final PackagedProgram program = new PackagedProgram(
+			new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+			new String[] {
+				checkpointDir.toURI().toString(),
+				outputDir.toURI().toString()
+			});
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+		program.invokeInteractiveModeForExecution();
+	}
+
+	/**
+	 * Tests disposal of a savepoint, which contains custom user code KvState.
+	 */
+	@Test
+	public void testDisposeSavepointWithCustomKvState() throws Exception {
+		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
+
+		File checkpointDir = FOLDER.newFolder();
+		File outputDir = FOLDER.newFolder();
+
+		final PackagedProgram program = new PackagedProgram(
+				new File(CUSTOM_KV_STATE_JAR_PATH),
+				new String[] {
+						String.valueOf(parallelism),
+						checkpointDir.toURI().toString(),
+						"5000",
+						outputDir.toURI().toString()
+				});
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList()
+		);
+
+		// Execute detached
+		Thread invokeThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					program.invokeInteractiveModeForExecution();
+				} catch (ProgramInvocationException ignored) {
+					if (ignored.getCause() == null ||
+						!(ignored.getCause() instanceof JobCancellationException)) {
+						ignored.printStackTrace();
+					}
+				}
+			}
+		});
+
+		LOG.info("Starting program invoke thread");
+		invokeThread.start();
+
+		// The job ID
+		JobID jobId = null;
+
+		ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
+
+		LOG.info("Waiting for job status running.");
+
+		// Wait for running job
+		while (jobId == null && deadline.hasTimeLeft()) {
+			Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
+			RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
+
+			for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
+				jobId = runningJob.getJobId();
+				LOG.info("Job running. ID: " + jobId);
+				break;
+			}
+
+			// Retry if job is not available yet
+			if (jobId == null) {
+				Thread.sleep(100L);
+			}
+		}
+
+		LOG.info("Wait for all tasks to be running.");
+		Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
+		Await.ready(allRunning, deadline.timeLeft());
+		LOG.info("All tasks are running.");
+
+		// Trigger savepoint
+		String savepointPath = null;
+		for (int i = 0; i < 20; i++) {
+			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
+			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+
+			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
+
+			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
+				savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
+				LOG.info("Triggered savepoint. Path: " + savepointPath);
+			} else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
+				Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+				LOG.info("Failed to trigger savepoint. Retrying...", cause);
+				// This can fail if the operators are not opened yet
+				Thread.sleep(500);
+			} else {
+				throw new IllegalStateException("Unexpected response to TriggerSavepoint");
+			}
+		}
+
+		assertNotNull("Failed to trigger savepoint", savepointPath);
+
+		// Dispose savepoint
+		LOG.info("Disposing savepoint at " + savepointPath);
+		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
+
+		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
+			// Success :-)
+			LOG.info("Disposed savepoint at " + savepointPath);
+		} else if (disposeResponse instanceof DisposeSavepointFailure) {
+			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
+		} else {
+			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
+		}
+
+		// Cancel job, wait for success
+		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+		Object response = Await.result(cancelFuture, deadline.timeLeft());
+		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+		// make sure, the execution is finished to not influence other test methods
+		invokeThread.join(deadline.timeLeft().toMillis());
+		assertFalse("Program invoke thread still running", invokeThread.isAlive());
+	}
+}