You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:26 UTC
[6/9] flink git commit: [FLINK-8704][tests] Port ClassLoaderITCase to
flip6
[FLINK-8704][tests] Port ClassLoaderITCase to flip6
This closes #5780.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/674a2d34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/674a2d34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/674a2d34
Branch: refs/heads/master
Commit: 674a2d3464cc5628961f3b9628713b9515ba6c6f
Parents: 420190d
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:49:48 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200
----------------------------------------------------------------------
.../test/classloading/ClassLoaderITCase.java | 159 ++++----
.../classloading/LegacyClassLoaderITCase.java | 399 +++++++++++++++++++
2 files changed, 472 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index b357904..089ade4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -19,35 +19,35 @@
package org.apache.flink.test.classloading;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -56,25 +56,22 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
/**
* Test job classloader.
*/
+@Category(New.class)
public class ClassLoaderITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
@@ -95,22 +92,21 @@ public class ClassLoaderITCase extends TestLogger {
private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
- @ClassRule
- public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+ private static final TemporaryFolder FOLDER = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
- private static TestingCluster testCluster;
+ private static MiniCluster testCluster;
- private static int parallelism;
+ private static final int parallelism = 4;
@BeforeClass
public static void setUp() throws Exception {
+ FOLDER.create();
+
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- parallelism = 4;
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
@@ -121,16 +117,29 @@ public class ClassLoaderITCase extends TestLogger {
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
- testCluster = new TestingCluster(config, false);
+ // required as we otherwise run out of memory
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80);
+
+ testCluster = new MiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(2)
+ .setNumSlotsPerTaskManager(2)
+ .setConfiguration(config)
+ .build()
+ );
testCluster.start();
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDownClass() throws Exception {
if (testCluster != null) {
- testCluster.stop();
+ testCluster.close();
}
+ FOLDER.delete();
+ }
+ @After
+ public void tearDown() throws Exception {
TestStreamEnvironment.unsetAsContext();
TestEnvironment.unsetAsContext();
}
@@ -202,15 +211,27 @@ public class ClassLoaderITCase extends TestLogger {
Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.<URL>emptyList());
- // Program should terminate with a 'SuccessException':
- // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
- expectedException.expectCause(
- Matchers.<Throwable>hasProperty("cause",
- hasProperty("class",
- hasProperty("canonicalName", equalTo(
- "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
-
- streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ try {
+ streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ } catch (Exception e) {
+ // Program should terminate with a 'SuccessException':
+ // the exception class is contained in the user-jar, but is not present on the maven classpath
+ // the deserialization of the exception should thus fail here
+ try {
+ Optional<Throwable> exception = ExceptionUtils.findThrowable(e,
+ candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
+
+ // if we reach this point we either failed due to another exception,
+ // or the deserialization of the user-exception did not fail
+ if (!exception.isPresent()) {
+ throw e;
+ } else {
+ Assert.fail("Deserialization of user exception should have failed.");
+ }
+ } catch (NoClassDefFoundError expected) {
+ // expected
+ }
+ }
}
@Test
@@ -234,15 +255,7 @@ public class ClassLoaderITCase extends TestLogger {
@Test
public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
- int port = testCluster.getLeaderRPCPort();
-
- // test FLINK-3633
- final PackagedProgram userCodeTypeProg = new PackagedProgram(
- new File(USERCODETYPE_JAR_PATH),
- new String[] { USERCODETYPE_JAR_PATH,
- "localhost",
- String.valueOf(port),
- });
+ PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH));
TestEnvironment.setAsContext(
testCluster,
@@ -282,6 +295,8 @@ public class ClassLoaderITCase extends TestLogger {
*/
@Test
public void testDisposeSavepointWithCustomKvState() throws Exception {
+ ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), testCluster);
+
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
File checkpointDir = FOLDER.newFolder();
@@ -324,19 +339,18 @@ public class ClassLoaderITCase extends TestLogger {
// The job ID
JobID jobId = null;
- ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
-
LOG.info("Waiting for job status running.");
// Wait for running job
while (jobId == null && deadline.hasTimeLeft()) {
- Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
- RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
- for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
- jobId = runningJob.getJobId();
- LOG.info("Job running. ID: " + jobId);
- break;
+ Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ for (JobStatusMessage job : jobs) {
+ if (job.getJobState() == JobStatus.RUNNING) {
+ jobId = job.getJobId();
+ LOG.info("Job running. ID: " + jobId);
+ break;
+ }
}
// Retry if job is not available yet
@@ -345,52 +359,25 @@ public class ClassLoaderITCase extends TestLogger {
}
}
- LOG.info("Wait for all tasks to be running.");
- Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
- Await.ready(allRunning, deadline.timeLeft());
- LOG.info("All tasks are running.");
-
// Trigger savepoint
String savepointPath = null;
for (int i = 0; i < 20; i++) {
LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
- Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
- Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
-
- if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
- savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
- LOG.info("Triggered savepoint. Path: " + savepointPath);
- } else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
- Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+ try {
+ savepointPath = clusterClient.triggerSavepoint(jobId, null)
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (Exception cause) {
LOG.info("Failed to trigger savepoint. Retrying...", cause);
// This can fail if the operators are not opened yet
Thread.sleep(500);
- } else {
- throw new IllegalStateException("Unexpected response to TriggerSavepoint");
}
}
assertNotNull("Failed to trigger savepoint", savepointPath);
- // Dispose savepoint
- LOG.info("Disposing savepoint at " + savepointPath);
- Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
- Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
-
- if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
- // Success :-)
- LOG.info("Disposed savepoint at " + savepointPath);
- } else if (disposeResponse instanceof DisposeSavepointFailure) {
- throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
- } else {
- throw new IllegalStateException("Unexpected response to DisposeSavepoint");
- }
+ clusterClient.disposeSavepoint(savepointPath).get();
- // Cancel job, wait for success
- Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
- Object response = Await.result(cancelFuture, deadline.timeLeft());
- assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+ clusterClient.cancel(jobId);
// make sure, the execution is finished to not influence other test methods
invokeThread.join(deadline.timeLeft().toMillis());
http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
new file mode 100644
index 0000000..fa114e1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test job classloader.
+ */
+public class LegacyClassLoaderITCase extends TestLogger {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyClassLoaderITCase.class);
+
+ private static final String INPUT_SPLITS_PROG_JAR_FILE = "customsplit-test-jar.jar";
+
+ private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "streaming-customsplit-test-jar.jar";
+
+ private static final String STREAMING_PROG_JAR_FILE = "streamingclassloader-test-jar.jar";
+
+ private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "streaming-checkpointed-classloader-test-jar.jar";
+
+ private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
+
+ private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
+
+ private static final String CUSTOM_KV_STATE_JAR_PATH = "custom_kv_state-test-jar.jar";
+
+ private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
+
+ @ClassRule
+ public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private static TestingCluster testCluster;
+
+ private static int parallelism;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ parallelism = 4;
+
+ // we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
+ config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+ config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+ // Savepoint path
+ config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
+ FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+ testCluster = new TestingCluster(config, false);
+ testCluster.start();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testCluster != null) {
+ testCluster.stop();
+ }
+
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
+ }
+
+ @Test
+ public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
+
+ PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ inputSplitTestProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+ URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+ PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.singleton(classpath));
+
+ inputSplitTestProg2.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // regular streaming job
+ PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ streamingProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+ // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+ PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ // Program should terminate with a 'SuccessException':
+ // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause",
+ hasProperty("class",
+ hasProperty("canonicalName", equalTo(
+ "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+ streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram kMeansProg = new PackagedProgram(
+ new File(KMEANS_JAR_PATH),
+ new String[] {
+ KMeansData.DATAPOINTS,
+ KMeansData.INITIAL_CENTERS,
+ "25"
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(KMEANS_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ kMeansProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ int port = testCluster.getLeaderRPCPort();
+
+ // test FLINK-3633
+ final PackagedProgram userCodeTypeProg = new PackagedProgram(
+ new File(USERCODETYPE_JAR_PATH),
+ new String[] { USERCODETYPE_JAR_PATH,
+ "localhost",
+ String.valueOf(port),
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ userCodeTypeProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ File checkpointDir = FOLDER.newFolder();
+ File outputDir = FOLDER.newFolder();
+
+ final PackagedProgram program = new PackagedProgram(
+ new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+ new String[] {
+ checkpointDir.toURI().toString(),
+ outputDir.toURI().toString()
+ });
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+ program.invokeInteractiveModeForExecution();
+ }
+
+ /**
+ * Tests disposal of a savepoint, which contains custom user code KvState.
+ */
+ @Test
+ public void testDisposeSavepointWithCustomKvState() throws Exception {
+ Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
+
+ File checkpointDir = FOLDER.newFolder();
+ File outputDir = FOLDER.newFolder();
+
+ final PackagedProgram program = new PackagedProgram(
+ new File(CUSTOM_KV_STATE_JAR_PATH),
+ new String[] {
+ String.valueOf(parallelism),
+ checkpointDir.toURI().toString(),
+ "5000",
+ outputDir.toURI().toString()
+ });
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList()
+ );
+
+ // Execute detached
+ Thread invokeThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ program.invokeInteractiveModeForExecution();
+ } catch (ProgramInvocationException ignored) {
+ if (ignored.getCause() == null ||
+ !(ignored.getCause() instanceof JobCancellationException)) {
+ ignored.printStackTrace();
+ }
+ }
+ }
+ });
+
+ LOG.info("Starting program invoke thread");
+ invokeThread.start();
+
+ // The job ID
+ JobID jobId = null;
+
+ ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
+
+ LOG.info("Waiting for job status running.");
+
+ // Wait for running job
+ while (jobId == null && deadline.hasTimeLeft()) {
+ Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
+ RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
+
+ for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
+ jobId = runningJob.getJobId();
+ LOG.info("Job running. ID: " + jobId);
+ break;
+ }
+
+ // Retry if job is not available yet
+ if (jobId == null) {
+ Thread.sleep(100L);
+ }
+ }
+
+ LOG.info("Wait for all tasks to be running.");
+ Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
+ Await.ready(allRunning, deadline.timeLeft());
+ LOG.info("All tasks are running.");
+
+ // Trigger savepoint
+ String savepointPath = null;
+ for (int i = 0; i < 20; i++) {
+ LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
+ Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+
+ Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
+
+ if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
+ savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
+ LOG.info("Triggered savepoint. Path: " + savepointPath);
+ } else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
+ Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+ LOG.info("Failed to trigger savepoint. Retrying...", cause);
+ // This can fail if the operators are not opened yet
+ Thread.sleep(500);
+ } else {
+ throw new IllegalStateException("Unexpected response to TriggerSavepoint");
+ }
+ }
+
+ assertNotNull("Failed to trigger savepoint", savepointPath);
+
+ // Dispose savepoint
+ LOG.info("Disposing savepoint at " + savepointPath);
+ Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+ Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
+
+ if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
+ // Success :-)
+ LOG.info("Disposed savepoint at " + savepointPath);
+ } else if (disposeResponse instanceof DisposeSavepointFailure) {
+ throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
+ } else {
+ throw new IllegalStateException("Unexpected response to DisposeSavepoint");
+ }
+
+ // Cancel job, wait for success
+ Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+ Object response = Await.result(cancelFuture, deadline.timeLeft());
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+ // make sure, the execution is finished to not influence other test methods
+ invokeThread.join(deadline.timeLeft().toMillis());
+ assertFalse("Program invoke thread still running", invokeThread.isAlive());
+ }
+}