You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:15 UTC

[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6

[FLINK-8961][tests] Port JobRetrievalITCase to flip6

This closes #5730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2266eb01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2266eb01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2266eb01

Branch: refs/heads/master
Commit: 2266eb010b377450aa1f01ec589fe8758e9a0c6d
Parents: b1f3ca3
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:17:53 2018 +0200

----------------------------------------------------------------------
 .../test/example/client/JobRetrievalITCase.java | 121 +++++++-------
 .../client/LegacyJobRetrievalITCase.java        | 162 +++++++++++++++++++
 2 files changed, 224 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(New.class)
 public class JobRetrievalITCase extends TestLogger {
 
 	private static final Semaphore lock = new Semaphore(1);
 
-	private static FlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void before() {
-		Configuration configuration = new Configuration();
-		cluster = new TestingCluster(configuration, false);
-		cluster.start();
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			new Configuration(),
+			1,
+			4
+		),
+		MiniClusterResource.MiniClusterType.NEW
+	);
+
+	private RestClusterClient<StandaloneClusterId> client;
+
+	@Before
+	public void setUp() throws Exception {
+		final Configuration clientConfig = new Configuration();
+		clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+		clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+		clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+		client = new RestClusterClient<>(
+			clientConfig,
+			StandaloneClusterId.getInstance()
+		);
 	}
 
-	@AfterClass
-	public static void after() {
-		cluster.stop();
-		cluster = null;
+	@After
+	public void tearDown() {
+		if (client != null) {
+			client.shutdown();
+		}
 	}
 
 	@Test
@@ -80,64 +95,52 @@ public class JobRetrievalITCase extends TestLogger {
 
 		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
 
-		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
-
 		// acquire the lock to make sure that the job cannot complete until the job client
 		// has been attached in resumingThread
 		lock.acquire();
-		client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader());
-		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-		final Thread resumingThread = new Thread(new Runnable() {
+		client.setDetached(true);
+		client.submitJob(jobGraph, JobRetrievalITCase.class.getClassLoader());
+
+		final CheckedThread resumingThread = new CheckedThread("Flink-Job-Retriever") {
 			@Override
-			public void run() {
-				try {
-					assertNotNull(client.retrieveJob(jobID));
-				} catch (Throwable e) {
-					error.set(e);
-				}
+			public void go() throws Exception {
+				assertNotNull(client.requestJobResult(jobID).get());
 			}
-		}, "Flink-Job-Retriever");
-
-		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
-		final ActorSystem actorSystem = actorSystemSeq.last();
-		JavaTestKit testkit = new JavaTestKit(actorSystem);
+		};
 
-		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
-		// wait until client connects
-		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
-		// confirm registration
-		testkit.expectMsgEquals(true);
+		// wait until the job is running
+		while (client.listJobs().get().isEmpty()) {
+			Thread.sleep(50);
+		}
 
 		// kick off resuming
 		resumingThread.start();
 
 		// wait for client to connect
-		testkit.expectMsgAllOf(
-			TestingJobManagerMessages.getClientConnected(),
-			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+		while (resumingThread.getState() != Thread.State.WAITING) {
+			Thread.sleep(10);
+		}
 
 		// client has connected, we can release the lock
 		lock.release();
 
-		resumingThread.join();
-
-		Throwable exception = error.get();
-		if (exception != null) {
-			throw new AssertionError(exception);
-		}
+		resumingThread.sync();
 	}
 
 	@Test
 	public void testNonExistingJobRetrieval() throws Exception {
 		final JobID jobID = new JobID();
-		ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
 
 		try {
-			client.retrieveJob(jobID);
+			client.requestJobResult(jobID).get();
 			fail();
-		} catch (JobRetrievalException ignored) {
-			// this is what we want
+		} catch (Exception exception) {
+			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(exception,
+				candidate -> candidate.getMessage() != null && candidate.getMessage().contains("Could not find Flink job"));
+			if (!expectedCause.isPresent()) {
+				throw exception;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
new file mode 100644
index 0000000..174c90e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests retrieval of a job from a running Flink cluster.
+ */
+public class LegacyJobRetrievalITCase extends TestLogger {
+
+	private static final Semaphore lock = new Semaphore(1);
+
+	private static FlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void before() {
+		Configuration configuration = new Configuration();
+		cluster = new TestingCluster(configuration, false);
+		cluster.start();
+	}
+
+	@AfterClass
+	public static void after() {
+		cluster.stop();
+		cluster = null;
+	}
+
+	@Test
+	public void testJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+
+		final JobVertex imalock = new JobVertex("imalock");
+		imalock.setInvokableClass(SemaphoreInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock);
+
+		final ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration(), cluster.highAvailabilityServices(), true);
+
+		// acquire the lock to make sure that the job cannot complete until the job client
+		// has been attached in resumingThread
+		lock.acquire();
+		client.runDetached(jobGraph, LegacyJobRetrievalITCase.class.getClassLoader());
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+
+		final Thread resumingThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					assertNotNull(client.retrieveJob(jobID));
+				} catch (Throwable e) {
+					error.set(e);
+				}
+			}
+		}, "Flink-Job-Retriever");
+
+		final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get();
+		final ActorSystem actorSystem = actorSystemSeq.last();
+		JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+		final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+		// wait until client connects
+		jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef());
+		// confirm registration
+		testkit.expectMsgEquals(true);
+
+		// kick off resuming
+		resumingThread.start();
+
+		// wait for client to connect
+		testkit.expectMsgAllOf(
+			TestingJobManagerMessages.getClientConnected(),
+			TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
+		// client has connected, we can release the lock
+		lock.release();
+
+		resumingThread.join();
+
+		Throwable exception = error.get();
+		if (exception != null) {
+			throw new AssertionError(exception);
+		}
+	}
+
+	@Test
+	public void testNonExistingJobRetrieval() throws Exception {
+		final JobID jobID = new JobID();
+		ClusterClient<StandaloneClusterId> client = new StandaloneClusterClient(cluster.configuration());
+
+		try {
+			client.retrieveJob(jobID);
+			fail();
+		} catch (JobRetrievalException ignored) {
+			// this is what we want
+		}
+	}
+
+	/**
+	 * Invokable that waits on {@link #lock} to be released and finishes afterwards.
+	 *
+	 * <p>NOTE: needs to be <tt>public</tt> so that a task can be run with this!
+	 */
+	public static class SemaphoreInvokable extends AbstractInvokable {
+
+		public SemaphoreInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			lock.acquire();
+			lock.release();
+		}
+	}
+
+}