You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:15 UTC
[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase
to flip6
[FLINK-8961][tests] Port JobRetrievalITCase to flip6
This closes #5730.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2266eb01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2266eb01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2266eb01
Branch: refs/heads/master
Commit: 2266eb010b377450aa1f01ec589fe8758e9a0c6d
Parents: b1f3ca3
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:17:53 2018 +0200
----------------------------------------------------------------------
.../test/example/client/JobRetrievalITCase.java | 121 +++++++-------
.../client/LegacyJobRetrievalITCase.java | 162 +++++++++++++++++++
2 files changed, 224 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import java.util.Optional;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
/**
* Tests retrieval of a job from a running Flink cluster.
*/
+@Category(New.class)
public class JobRetrievalITCase extends TestLogger {
private static final Semaphore lock = new Semaphore(1);
- private static FlinkMiniCluster cluster;
-
- @BeforeClass
- public static void before() {
- Configuration configuration = new Configuration();
- cluster = new TestingCluster(configuration, false);
- cluster.start();
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ new Configuration(),
+ 1,
+ 4
+ ),
+ MiniClusterResource.MiniClusterType.NEW
+ );
+
+ private RestClusterClient<StandaloneClusterId> client;
+
+ @Before
+ public void setUp() throws Exception {
+ final Configuration clientConfig = new Configuration();
+ clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+ clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+ clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+ client = new RestClusterClient<>(
+ clientConfig,
+ StandaloneClusterId.getInstance()
+ );
}
- @AfterClass
- public static void after() {
- cluster.stop();
- cluster = null;
+ @After
+ public void tearDown() {
+ if (client != null) {
+ client.shutdown();
+ }
}
@Test
@@ -80,64 +95,52 @@ public class JobRetrievalITCase extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
- final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
-
// acquire the lock to make sure that the job cannot complete until the job client
// has been attached in resumingThread
lock.acquire();
- client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread resumingThread = new Thread(new Runnable() {
+ client.setDetached(true);
+ client.submitJob(jobGraph, JobRetrievalITCase.class.getClassLoader());
+
+ final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
@Override
- public void run() {
- try {
- assertNotNull(client.retrieveJob(jobID));
- } catch (Throwable e) {
- error.set(e);
- }
+ public void go() throws Exception {
+ assertNotNull(client.requestJobResult(jobID).get());
}
- }, "Flink-Job-Retriever");
-
- final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
- final ActorSystem actorSystem = actorSystemSeq.last();
- JavaTestKit testkit = new JavaTestKit(actorSystem);
+ };
- final ActorRef jm = cluster.getJobManagersAsJava().get(0);
- // wait until client connects
- jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
- // confirm registration
- testkit.expectMsgEquals(true);
+ // wait until the job is running
+ while (client.listJobs().get().isEmpty()) {
+ Thread.sleep(50);
+ }
// kick off resuming
resumingThread.start();
// wait for client to connect
- testkit.expectMsgAllOf(
- TestingJobManagerMessages.getClientConnected(),
- TestingJobManagerMessages.getClassLoadingPropsDelivered());
+ while (resumingThread.getState() != Thread.State.WAITING) {
+ Thread.sleep(10);
+ }
// client has connected, we can release the lock
lock.release();
- resumingThread.join();
-
- Throwable exception = error.get();
- if (exception != null) {
- throw new AssertionError(exception);
- }
+ resumingThread.sync();
}
@Test
public void testNonExistingJobRetrieval() throws Exception {
final JobID jobID = new JobID();
- ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
try {
- client.retrieveJob(jobID);
+ client.requestJobResult(jobID).get();
fail();
- } catch (JobRetrievalException ignored) {
- // this is what we want
+ } catch (Exception exception) {
+ Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(exception,
+ candidate -> candidate.getMessage() != null && candidate.getMessage().contains("Could not find Flink job"));
+ if (!expectedCause.isPresent()) {
+ throw exception;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
new file mode 100644
index 0000000..174c90e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests retrieval of a job from a running Flink cluster.
+ */
+public class LegacyJobRetrievalITCase extends TestLogger {
+
+ private static final Semaphore lock = new Semaphore(1);
+
+ private static FlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void before() {
+ Configuration configuration = new Configuration();
+ cluster = new TestingCluster(configuration, false);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void after() {
+ cluster.stop();
+ cluster = null;
+ }
+
+ @Test
+ public void testJobRetrieval() throws Exception {
+ final JobID jobID = new JobID();
+
+ final JobVertex imalock = new JobVertex("imalock");
+ imalock.setInvokableClass(SemaphoreInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
+
+ final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
+
+ // acquire the lock to make sure that the job cannot complete until the job client
+ // has been attached in resumingThread
+ lock.acquire();
+ client.runDetached(jobGraph, LegacyJobRetrievalITCase.class.getClassLoader());
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final Thread resumingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertNotNull(client.retrieveJob(jobID));
+ } catch (Throwable e) {
+ error.set(e);
+ }
+ }
+ }, "Flink-Job-Retriever");
+
+ final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
+ final ActorSystem actorSystem = actorSystemSeq.last();
+ JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+ final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+ // wait until client connects
+ jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
+ // confirm registration
+ testkit.expectMsgEquals(true);
+
+ // kick off resuming
+ resumingThread.start();
+
+ // wait for client to connect
+ testkit.expectMsgAllOf(
+ TestingJobManagerMessages.getClientConnected(),
+ TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
+ // client has connected, we can release the lock
+ lock.release();
+
+ resumingThread.join();
+
+ Throwable exception = error.get();
+ if (exception != null) {
+ throw new AssertionError(exception);
+ }
+ }
+
+ @Test
+ public void testNonExistingJobRetrieval() throws Exception {
+ final JobID jobID = new JobID();
+ ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
+
+ try {
+ client.retrieveJob(jobID);
+ fail();
+ } catch (JobRetrievalException ignored) {
+ // this is what we want
+ }
+ }
+
+ /**
+ * Invokable that waits on {@link #lock} to be released and finishes afterwards.
+ *
+ * <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
+ */
+ public static class SemaphoreInvokable extends AbstractInvokable {
+
+ public SemaphoreInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ lock.acquire();
+ lock.release();
+ }
+ }
+
+}