You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:12 UTC
[07/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close
calls from ZooKeeper based HA services
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 5f35229..92de31a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,12 +28,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -43,22 +45,29 @@ import org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Matchers;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor;
import static org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager;
-import static org.apache.flink.runtime.testingUtils.TestingUtils.createJobManager;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* The tests in this class verify the behavior of the TaskManager
@@ -67,8 +76,6 @@ import static org.junit.Assert.*;
*/
public class TaskManagerRegistrationTest extends TestLogger {
- private static final Option<String> NONE_STRING = Option.empty();
-
// use one actor system throughout all tests
private static ActorSystem actorSystem;
@@ -76,6 +83,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+ private TestingHighAvailabilityServices highAvailabilityServices;
+
@BeforeClass
public static void startActorSystem() {
config = new Configuration();
@@ -94,38 +103,58 @@ public class TaskManagerRegistrationTest extends TestLogger {
}
}
+ @Before
+ public void setupTest() {
+ highAvailabilityServices = new TestingHighAvailabilityServices();
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices = null;
+ }
+
/**
* A test that verifies that two TaskManagers correctly register at the
* JobManager.
*/
@Test
- public void testSimpleRegistration() {
+ public void testSimpleRegistration() throws Exception {
new JavaTestKit(actorSystem) {{
ActorGateway jobManager = null;
ActorGateway taskManager1 = null;
ActorGateway taskManager2 = null;
+ ActorGateway resourceManager = null;
+
+ EmbeddedHaServices embeddedHaServices = null;
try {
+ embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
+
// a simple JobManager
- jobManager = createJobManager(
+ jobManager = TestingUtils.createJobManager(
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- config);
- startResourceManager(config, jobManager.actor());
+ config,
+ embeddedHaServices);
+
+ resourceManager = new AkkaActorGateway(
+ startResourceManager(config, embeddedHaServices),
+ jobManager.leaderSessionID());
// start two TaskManagers. it will automatically try to register
taskManager1 = createTaskManager(
actorSystem,
- jobManager,
+ embeddedHaServices,
config,
true,
false);
taskManager2 = createTaskManager(
actorSystem,
- jobManager,
+ embeddedHaServices,
config,
true,
false);
@@ -142,10 +171,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
Object response1 = Await.result(responseFuture1, timeout);
Object response2 = Await.result(responseFuture2, timeout);
- // this is a hack to work around the way Java can interact with scala case objects
- Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
- assertTrue(response1 != null && confirmClass.isAssignableFrom(response1.getClass()));
- assertTrue(response2 != null && confirmClass.isAssignableFrom(response2.getClass()));
+ assertTrue(response1 instanceof TaskManagerMessages.RegisteredAtJobManager);
+ assertTrue(response2 instanceof TaskManagerMessages.RegisteredAtJobManager);
// check that the JobManager has 2 TaskManagers registered
Future<Object> numTaskManagersFuture = jobManager.ask(
@@ -159,9 +186,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
e.printStackTrace();
fail(e.getMessage());
} finally {
- stopActor(taskManager1);
- stopActor(taskManager2);
- stopActor(jobManager);
+ stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager));
+
+ embeddedHaServices.closeAndCleanupAllData();
}
}};
}
@@ -171,36 +198,35 @@ public class TaskManagerRegistrationTest extends TestLogger {
* JobManager.
*/
@Test
- public void testDelayedRegistration() {
+ public void testDelayedRegistration() throws Exception {
new JavaTestKit(actorSystem) {{
ActorGateway jobManager = null;
ActorGateway taskManager = null;
- FiniteDuration delayedTimeout = timeout.$times(3);
+ FiniteDuration delayedTimeout = timeout.$times(3L);
+
+ final EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
try {
// start a TaskManager that tries to register at the JobManager before the JobManager is
// available. we give it the regular JobManager akka URL
taskManager = createTaskManager(
actorSystem,
- AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
+ embeddedHaServices,
new Configuration(),
true,
false);
// let it try for a bit
- Thread.sleep(6000);
+ Thread.sleep(6000L);
// now start the JobManager, with the regular akka URL
- jobManager = createJobManager(
+ jobManager = TestingUtils.createJobManager(
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Configuration());
-
- startResourceManager(config, jobManager.actor());
-
- startResourceManager(config, jobManager.actor());
+ new Configuration(),
+ embeddedHaServices);
// check that the TaskManagers are registered
Future<Object> responseFuture = taskManager.ask(
@@ -209,17 +235,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
Object response = Await.result(responseFuture, delayedTimeout);
- // this is a hack to work around the way Java can interact with scala case objects
- Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
- assertTrue(response != null && confirmClass.isAssignableFrom(response.getClass()));
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ assertTrue(response instanceof TaskManagerMessages.RegisteredAtJobManager);
} finally {
- stopActor(taskManager);
- stopActor(jobManager);
+ stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
+
+ embeddedHaServices.closeAndCleanupAllData();
}
}};
}
@@ -246,10 +266,17 @@ public class TaskManagerRegistrationTest extends TestLogger {
Configuration tmConfig = new Configuration();
tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms");
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ // Give a non-existent job manager address to the task manager
+ new TestingLeaderRetrievalService(
+ "foobar",
+ HighAvailabilityServices.DEFAULT_LEADER_ID));
+
// start the taskManager actor
taskManager = createTaskManager(
actorSystem,
- AkkaUtils.getLocalAkkaURL(JobMaster.JOB_MANAGER_NAME),
+ highAvailabilityServices,
tmConfig,
true,
false);
@@ -271,7 +298,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
e.printStackTrace();
fail(e.getMessage());
} finally {
- stopActor(taskManager);
+ stopActorGracefully(taskManager);
}
}};
}
@@ -286,18 +313,28 @@ public class TaskManagerRegistrationTest extends TestLogger {
ActorGateway jm = null;
ActorGateway taskManager =null;
try {
- jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
+ jm = TestingUtils.createForwardingActor(
+ actorSystem,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.<String>empty());
final ActorGateway jmGateway = jm;
FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
Configuration tmConfig = new Configuration(config);
tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new TestingLeaderRetrievalService(
+ jm.path(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID));
+
// we make the test actor (the test kit) the JobManager to intercept
// the messages
taskManager = createTaskManager(
actorSystem,
- jmGateway,
+ highAvailabilityServices,
tmConfig,
true,
false);
@@ -330,13 +367,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
expectMsgClass(RegisterTaskManager.class);
}
};
- }
- catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
} finally {
- stopActor(taskManager);
- stopActor(jm);
+ stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
}
}};
}
@@ -353,7 +385,18 @@ public class TaskManagerRegistrationTest extends TestLogger {
try {
FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
- jm = TestingUtils.createForwardingActor(actorSystem, getTestActor(), Option.<String>empty());
+ jm = TestingUtils.createForwardingActor(
+ actorSystem,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.<String>empty());
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new TestingLeaderRetrievalService(
+ jm.path(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID));
+
final ActorGateway jmGateway = jm;
long refusedRegistrationPause = 500;
@@ -368,7 +411,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
// the messages
taskManager = createTaskManager(
actorSystem,
- jmGateway,
+ highAvailabilityServices,
tmConfig,
true,
false);
@@ -423,8 +466,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
+ maxExpectedNumberOfRegisterTaskManagerMessages,
registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages);
} finally {
- stopActor(taskManager);
- stopActor(jm);
+ stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
}
}};
}
@@ -444,16 +486,25 @@ public class TaskManagerRegistrationTest extends TestLogger {
try {
fakeJobManager1Gateway = TestingUtils.createForwardingActor(
- actorSystem,
- getTestActor(),
- Option.apply(JOB_MANAGER_NAME));
+ actorSystem,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.apply(JOB_MANAGER_NAME));
final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway;
+ TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+ fakeJM1Gateway.path(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ testingLeaderRetrievalService);
+
// we make the test actor (the test kit) the JobManager to intercept
// the messages
taskManagerGateway = createTaskManager(
actorSystem,
- fakeJobManager1Gateway,
+ highAvailabilityServices,
config,
true,
false);
@@ -512,9 +563,10 @@ public class TaskManagerRegistrationTest extends TestLogger {
do {
try {
fakeJobManager2Gateway = TestingUtils.createForwardingActor(
- actorSystem,
- getTestActor(),
- Option.apply(JOB_MANAGER_NAME));
+ actorSystem,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.apply(JOB_MANAGER_NAME));
} catch (InvalidActorNameException e) {
// wait and retry
Thread.sleep(100);
@@ -543,9 +595,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
e.printStackTrace();
fail(e.getMessage());
} finally {
- stopActor(taskManagerGateway);
- stopActor(fakeJobManager1Gateway);
- stopActor(fakeJobManager2Gateway);
+ stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway));
}
}};
}
@@ -556,21 +606,25 @@ public class TaskManagerRegistrationTest extends TestLogger {
ActorGateway taskManagerGateway = null;
+ final UUID falseLeaderSessionID = UUID.randomUUID();
+ final UUID trueLeaderSessionID = UUID.randomUUID();
+
+ HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class);
+ when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
+ .thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID));
+
try {
// we make the test actor (the test kit) the JobManager to intercept
// the messages
taskManagerGateway = createTaskManager(
actorSystem,
- getTestActor(),
+ mockedHighAvailabilityServices,
config,
true,
false);
final ActorRef taskManager = taskManagerGateway.actor();
- final UUID falseLeaderSessionID = UUID.randomUUID();
- final UUID trueLeaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
-
new Within(timeout) {
@Override
@@ -581,7 +635,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
- assertTrue(lsm.leaderSessionID() == trueLeaderSessionID);
+ assertTrue(lsm.leaderSessionID().equals(trueLeaderSessionID));
assertTrue(lsm.message() instanceof RegisterTaskManager);
final ActorRef tm = getLastSender();
@@ -606,9 +660,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
getTestActor());
Object message = null;
- Object confirmMessageClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
- while(message == null || !(message.getClass().equals(confirmMessageClass))) {
+ while(!(message instanceof TaskManagerMessages.RegisteredAtJobManager)) {
message = receiveOne(TestingUtils.TESTING_DURATION());
}
@@ -617,12 +670,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(trueLeaderSessionID));
}
};
- }
- catch (Throwable e) {
- e.printStackTrace();
- fail(e.getMessage());
} finally {
- stopActor(taskManagerGateway);
+ stopActorGracefully(taskManagerGateway);
}
}};
}
@@ -631,25 +680,11 @@ public class TaskManagerRegistrationTest extends TestLogger {
// Utility Functions
// --------------------------------------------------------------------------------------------
- private static ActorRef startJobManager(Configuration configuration) throws Exception {
- // start the actors. don't give names, so they get generated names and we
- // avoid conflicts with the actor names
- return JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- NONE_STRING,
- NONE_STRING,
- JobManager.class,
- MemoryArchivist.class)._1();
- }
-
- private static ActorRef startResourceManager(Configuration config, ActorRef jobManager) {
+ private static ActorRef startResourceManager(Configuration config, HighAvailabilityServices highAvailabilityServices) {
return FlinkResourceManager.startResourceManagerActors(
config,
actorSystem,
- new StandaloneLeaderRetrievalService(jobManager.path().toString()),
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
StandaloneResourceManager.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 4df8db3..0e77ddd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -26,9 +26,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.StartupUtils;
import org.apache.flink.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import scala.Option;
@@ -46,6 +50,21 @@ import java.util.UUID;
* problems.
*/
public class TaskManagerStartupTest {
+
+ private HighAvailabilityServices highAvailabilityServices;
+
+ @Before
+ public void setupTest() {
+ highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices = null;
+ }
+ }
/**
@@ -55,8 +74,9 @@ public class TaskManagerStartupTest {
* @throws Throwable
*/
@Test(expected = BindException.class)
- public void testStartupWhenTaskmanagerActorPortIsUsed() throws BindException {
+ public void testStartupWhenTaskmanagerActorPortIsUsed() throws Exception {
ServerSocket blocker = null;
+
try {
final String localHostName = "localhost";
final InetAddress localBindAddress = InetAddress.getByName(NetUtils.getWildcardIPAddress());
@@ -65,8 +85,13 @@ public class TaskManagerStartupTest {
blocker = new ServerSocket(0, 50, localBindAddress);
final int port = blocker.getLocalPort();
- TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(),
- TaskManager.class);
+ TaskManager.runTaskManager(
+ localHostName,
+ ResourceID.generate(),
+ port,
+ new Configuration(),
+ highAvailabilityServices,
+ TaskManager.class);
fail("This should fail with an IOException");
}
@@ -92,6 +117,8 @@ public class TaskManagerStartupTest {
// no need to log here
}
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
@@ -102,7 +129,7 @@ public class TaskManagerStartupTest {
* directories are not writable.
*/
@Test
- public void testIODirectoryNotWritable() {
+ public void testIODirectoryNotWritable() throws Exception {
File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
File nonWritable = new File(tempDir, UUID.randomUUID().toString());
@@ -119,7 +146,12 @@ public class TaskManagerStartupTest {
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
try {
- TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+ TaskManager.runTaskManager(
+ "localhost",
+ ResourceID.generate(),
+ 0,
+ cfg,
+ highAvailabilityServices);
fail("Should fail synchronously with an exception");
}
catch (IOException e) {
@@ -139,6 +171,8 @@ public class TaskManagerStartupTest {
catch (IOException e) {
// best effort
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
@@ -157,7 +191,12 @@ public class TaskManagerStartupTest {
// something invalid
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
try {
- TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+ TaskManager.runTaskManager(
+ "localhost",
+ ResourceID.generate(),
+ 0,
+ cfg,
+ highAvailabilityServices);
fail("Should fail synchronously with an exception");
}
catch (IllegalConfigurationException e) {
@@ -169,7 +208,12 @@ public class TaskManagerStartupTest {
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20;
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize);
try {
- TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
+ TaskManager.runTaskManager(
+ "localhost",
+ ResourceID.generate(),
+ 0,
+ cfg,
+ highAvailabilityServices);
fail("Should fail synchronously with an exception");
}
catch (Exception e) {
@@ -204,9 +248,9 @@ public class TaskManagerStartupTest {
cfg,
ResourceID.generate(),
null,
+ highAvailabilityServices,
"localhost",
Option.<String>empty(),
- Option.<LeaderRetrievalService>empty(),
false,
TaskManager.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 4530ade..0f5afc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
@@ -60,6 +61,8 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
@@ -80,8 +83,10 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -126,7 +131,9 @@ public class TaskManagerTest extends TestLogger {
private static ActorSystem system;
- final static UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
+ final static UUID leaderSessionID = UUID.randomUUID();
+
+ private TestingHighAvailabilityServices highAvailabilityServices;
@BeforeClass
public static void setup() {
@@ -138,20 +145,39 @@ public class TaskManagerTest extends TestLogger {
JavaTestKit.shutdownActorSystem(system);
}
+ @Before
+ public void setupTest() {
+ highAvailabilityServices = new TestingHighAvailabilityServices();
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+
+ highAvailabilityServices = null;
+ }
+ }
+
@Test
public void testSubmitAndExecuteTask() throws IOException {
new JavaTestKit(system){{
ActorGateway taskManager = null;
final ActorGateway jobManager = TestingUtils.createForwardingActor(
- system,
- getTestActor(),
- Option.<String>empty());
+ system,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.<String>empty());
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
try {
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
false);
@@ -261,9 +287,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -398,9 +428,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -524,9 +558,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -624,9 +662,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID)));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -769,9 +811,13 @@ public class TaskManagerTest extends TestLogger {
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -906,6 +952,10 @@ public class TaskManagerTest extends TestLogger {
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
@@ -914,7 +964,7 @@ public class TaskManagerTest extends TestLogger {
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
config,
false,
true);
@@ -1024,13 +1074,17 @@ public class TaskManagerTest extends TestLogger {
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
final Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
config,
true,
true);
@@ -1122,9 +1176,13 @@ public class TaskManagerTest extends TestLogger {
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
config,
false,
true);
@@ -1166,18 +1224,26 @@ public class TaskManagerTest extends TestLogger {
// We need this to be a JM that answers to update messages for
// robustness on Travis (if jobs need to be resubmitted in (4)).
- ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID)));
- ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, leaderSessionID);
+ ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(
+ HighAvailabilityServices.DEFAULT_LEADER_ID)));
+ ActorGateway jobManagerActorGateway = new AkkaActorGateway(
+ jm,
+ HighAvailabilityServices.DEFAULT_LEADER_ID);
final ActorGateway testActorGateway = new AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ HighAvailabilityServices.DEFAULT_LEADER_ID);
try {
final ActorGateway jobManager = jobManagerActorGateway;
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
final ActorGateway taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
false);
@@ -1459,11 +1525,15 @@ public class TaskManagerTest extends TestLogger {
@Test
public void testTerminationOnFatalError() {
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new TestingLeaderRetrievalService());
+
new JavaTestKit(system){{
final ActorGateway taskManager = TestingUtils.createTaskManager(
system,
- system.deadLetters(), // no jobmanager
+ highAvailabilityServices, // no jobmanager
new Configuration(),
true,
false);
@@ -1524,9 +1594,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, leaderSessionID), "jobmanager");
ActorGateway jobManager = new AkkaActorGateway(jmActorRef, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
final ActorGateway taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
configuration,
true,
true);
@@ -1562,9 +1636,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -1619,9 +1697,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -1679,9 +1761,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
@@ -1724,9 +1810,13 @@ public class TaskManagerTest extends TestLogger {
ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
+
taskManager = TestingUtils.createTaskManager(
system,
- jobManager,
+ highAvailabilityServices,
new Configuration(),
true,
true);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index b2e8005..4be3299 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmanager
import java.net.InetAddress
-import java.util.concurrent.{Executors, ScheduledExecutorService}
+import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices
import org.apache.flink.runtime.instance._
import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
@@ -40,7 +41,7 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils
import org.junit.Assert.{assertNotEquals, assertNotNull}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, WordSpecLike}
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -51,31 +52,45 @@ import scala.language.postfixOps
*/
@RunWith(classOf[JUnitRunner])
class JobManagerRegistrationTest(_system: ActorSystem) extends TestKit(_system) with
-ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
+ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
def this() = this(AkkaUtils.createLocalActorSystem(new Configuration()))
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+
+ var highAvailabilityServices: HighAvailabilityServices = _
+
+ val timeout = FiniteDuration(30, TimeUnit.SECONDS)
override def afterAll(): Unit = {
executor.shutdownNow()
TestKit.shutdownActorSystem(system)
}
+ override def beforeEach(): Unit = {
+ highAvailabilityServices = new EmbeddedHaServices(executor)
+ }
+
+ override def afterEach(): Unit = {
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData()
+ }
+ }
+
"The JobManager" should {
"assign a TaskManager a unique instance ID" in {
var jmOption: Option[ActorGateway] = None
- var rmOption: Option[ActorGateway] = None
+ var rmOption: Option[ActorRef] = None
var tm1Option: Option[ActorRef] = None
var tm2Option: Option[ActorRef] = None
try {
- val jm = startTestingJobManager(_system)
+ val jm = startTestingJobManager(_system, highAvailabilityServices)
jmOption = Some(jm)
- val rm = startTestingResourceManager(_system, jm.actor())
+ val rm = startTestingResourceManager(_system, highAvailabilityServices)
rmOption = Some(rm)
val probe = TestProbe()
@@ -149,19 +164,19 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
"handle repeated registration calls" in {
var jmOption: Option[ActorGateway] = None
- var rmOption: Option[ActorGateway] = None
+ var rmOption: Option[ActorRef] = None
try {
val probe = TestProbe()
- val jm = startTestingJobManager(_system)
+ val jm = startTestingJobManager(_system, highAvailabilityServices)
jmOption = Some(jm)
- val rm = startTestingResourceManager(_system, jm.actor())
+ val rm = startTestingResourceManager(_system, highAvailabilityServices)
rmOption = Some(rm)
- val selfGateway = new AkkaActorGateway(
- probe.ref,
- HighAvailabilityServices.DEFAULT_LEADER_ID)
+ val leaderId = jm.leaderSessionID()
+
+ val selfGateway = new AkkaActorGateway(probe.ref, leaderId)
val resourceID = ResourceID.generate()
val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1)
@@ -199,22 +214,22 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
probe.expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(
- HighAvailabilityServices.DEFAULT_LEADER_ID,
- AcknowledgeRegistration(_, _)) =>
+ `leaderId`,
+ AcknowledgeRegistration(_, _)) =>
case m => fail("Wrong message type: " + m)
}
probe.expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(
- HighAvailabilityServices.DEFAULT_LEADER_ID,
- AlreadyRegistered(_, _)) =>
+ `leaderId`,
+ AlreadyRegistered(_, _)) =>
case m => fail("Wrong message type: " + m)
}
probe.expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(
- HighAvailabilityServices.DEFAULT_LEADER_ID,
- AlreadyRegistered(_, _)) =>
+ `leaderId`,
+ AlreadyRegistered(_, _)) =>
case m => fail("Wrong message type: " + m)
}
}
@@ -225,14 +240,16 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
}
}
- private def startTestingJobManager(system: ActorSystem): ActorGateway = {
- val config = new Configuration()
+ private def startTestingJobManager(
+ system: ActorSystem,
+ highAvailabilityServices: HighAvailabilityServices): ActorGateway = {
+ val config = new Configuration()
+
val components = JobManager.createJobManagerComponents(
config,
executor,
- executor,
- None)
+ executor)
// Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService.
// The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus,
@@ -249,25 +266,34 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
ActorRef.noSender,
components._4,
components._5,
+ highAvailabilityServices.getJobManagerLeaderElectionService(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
+ highAvailabilityServices.getSubmittedJobGraphStore(),
+ highAvailabilityServices.getCheckpointRecoveryFactory(),
components._8,
- components._9,
- components._10,
- components._11,
None)
- val jm = _system.actorOf(props)
+ _system.actorOf(props)
- new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID)
+ LeaderRetrievalUtils.retrieveLeaderGateway(
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
+ system,
+ timeout)
}
- private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway = {
+ private def startTestingResourceManager(
+ system: ActorSystem,
+ highAvailabilityServices: HighAvailabilityServices)
+ : ActorRef = {
val config = new Configuration()
- val rm: ActorRef = FlinkResourceManager.startResourceManagerActors(
+
+ FlinkResourceManager.startResourceManagerActors(
config,
_system,
- LeaderRetrievalUtils.createLeaderRetrievalService(config, jm),
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
classOf[TestingResourceManager])
- new AkkaActorGateway(rm, HighAvailabilityServices.DEFAULT_LEADER_ID)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index bd4a8fc..7980cdf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager}
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
@@ -48,7 +49,7 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWh
import org.apache.flink.runtime.testutils.TestingResourceManager
import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support
@@ -60,16 +61,35 @@ import scala.concurrent.{Await, Future}
*/
class TestingCluster(
userConfiguration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices,
singleActorSystem: Boolean,
synchronousDispatcher: Boolean)
extends LocalFlinkMiniCluster(
userConfiguration,
+ highAvailabilityServices,
singleActorSystem) {
- def this(userConfiguration: Configuration, singleActorSystem: Boolean) =
- this(userConfiguration, singleActorSystem, false)
+ def this(
+ userConfiguration: Configuration,
+ singleActorSystem: Boolean,
+ synchronousDispatcher: Boolean) = {
+ this(
+ userConfiguration,
+ HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ userConfiguration,
+ ExecutionContext.global),
+ singleActorSystem,
+ synchronousDispatcher)
+ }
+
+ def this(userConfiguration: Configuration, singleActorSystem: Boolean) = {
+ this(
+ userConfiguration,
+ singleActorSystem,
+ false)
+ }
- def this(userConfiguration: Configuration) = this(userConfiguration, true, false)
+ def this(userConfiguration: Configuration) = this(userConfiguration, true)
// --------------------------------------------------------------------------
@@ -224,10 +244,13 @@ class TestingCluster(
Seq(newJobManagerActorSystem),
1))
- val lrs = createLeaderRetrievalService()
+ jobManagerLeaderRetrievalService.foreach(_.stop())
+
+ jobManagerLeaderRetrievalService = Option(
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID))
- jobManagerLeaderRetrievalService = Some(lrs)
- lrs.start(this)
+ jobManagerLeaderRetrievalService.foreach(_.start(this))
case _ => throw new Exception("The JobManager of the TestingCluster have not " +
"been started properly.")
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index d139a3f..c8977f0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -27,22 +27,18 @@ import akka.pattern.{Patterns, ask}
import com.google.common.util.concurrent.MoreExecutors
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logger
-import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
-import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
-import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.jobmaster.JobMaster
-import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.util.LeaderRetrievalUtils
@@ -222,69 +218,13 @@ object TestingUtils {
}
}
- def createTaskManager(
- actorSystem: ActorSystem,
- jobManager: ActorRef,
- configuration: Configuration,
- useLocalCommunication: Boolean,
- waitForRegistration: Boolean)
- : ActorGateway = {
- val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager)
-
- createTaskManager(
- actorSystem,
- jobManagerURL,
- configuration,
- useLocalCommunication,
- waitForRegistration
- )
- }
-
- def createTaskManager(
- actorSystem: ActorSystem,
- jobManager: ActorGateway,
- configuration: Configuration,
- useLocalCommunication: Boolean,
- waitForRegistration: Boolean,
- taskManagerClass: Class[_ <: TaskManager])
- : ActorGateway = {
- val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
-
- createTaskManager(
- actorSystem,
- jobManagerURL,
- configuration,
- useLocalCommunication,
- waitForRegistration,
- taskManagerClass
- )
- }
-
- def createTaskManager(
- actorSystem: ActorSystem,
- jobManager: ActorGateway,
- configuration: Configuration,
- useLocalCommunication: Boolean,
- waitForRegistration: Boolean)
- : ActorGateway = {
- val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
-
- createTaskManager(
- actorSystem,
- jobManagerURL,
- configuration,
- useLocalCommunication,
- waitForRegistration
- )
- }
-
/** Creates a local TaskManager in the given ActorSystem. It is given a
* [[StandaloneLeaderRetrievalService]] which returns the given jobManagerURL. After creating
* the TaskManager, waitForRegistration specifies whether one waits until the TaskManager has
* registered at the JobManager. An ActorGateway to the TaskManager is returned.
*
* @param actorSystem ActorSystem in which the TaskManager shall be started
- * @param jobManagerURL URL of the JobManager to connect to
+ * @param highAvailabilityServices Service factory for high availability
* @param configuration Configuration
* @param useLocalCommunication true if the network stack shall use exclusively local
* communication
@@ -293,15 +233,15 @@ object TestingUtils {
* @return ActorGateway of the created TaskManager
*/
def createTaskManager(
- actorSystem: ActorSystem,
- jobManagerURL: String,
- configuration: Configuration,
- useLocalCommunication: Boolean,
- waitForRegistration: Boolean)
- : ActorGateway = {
+ actorSystem: ActorSystem,
+ highAvailabilityServices: HighAvailabilityServices,
+ configuration: Configuration,
+ useLocalCommunication: Boolean,
+ waitForRegistration: Boolean)
+ : ActorGateway = {
createTaskManager(
actorSystem,
- jobManagerURL,
+ highAvailabilityServices,
configuration,
useLocalCommunication,
waitForRegistration,
@@ -311,7 +251,7 @@ object TestingUtils {
def createTaskManager(
actorSystem: ActorSystem,
- jobManagerURL: String,
+ highAvailabilityServices: HighAvailabilityServices,
configuration: Configuration,
useLocalCommunication: Boolean,
waitForRegistration: Boolean,
@@ -324,26 +264,27 @@ object TestingUtils {
resultingConfiguration.addAll(configuration)
- val leaderRetrievalService = Option(new StandaloneLeaderRetrievalService(jobManagerURL))
-
val taskManager = TaskManager.startTaskManagerComponentsAndActor(
resultingConfiguration,
ResourceID.generate(),
actorSystem,
+ highAvailabilityServices,
"localhost",
None,
- leaderRetrievalService,
useLocalCommunication,
taskManagerClass
)
- if (waitForRegistration) {
+ val leaderId = if (waitForRegistration) {
val notificationResult = (taskManager ? NotifyWhenRegisteredAtJobManager)(TESTING_DURATION)
+ .mapTo[RegisteredAtJobManager]
- Await.ready(notificationResult, TESTING_DURATION)
+ Await.result(notificationResult, TESTING_DURATION).leaderId
+ } else {
+ HighAvailabilityServices.DEFAULT_LEADER_ID
}
- new AkkaActorGateway(taskManager, HighAvailabilityServices.DEFAULT_LEADER_ID)
+ new AkkaActorGateway(taskManager, leaderId)
}
/** Stops the given actor by sending it a Kill message
@@ -428,13 +369,15 @@ object TestingUtils {
actorSystem: ActorSystem,
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
- configuration: Configuration)
+ configuration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices)
: ActorGateway = {
createJobManager(
actorSystem,
futureExecutor,
ioExecutor,
configuration,
+ highAvailabilityServices,
classOf[TestingJobManager],
""
)
@@ -455,6 +398,7 @@ object TestingUtils {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
configuration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices,
prefix: String)
: ActorGateway = {
createJobManager(
@@ -462,6 +406,7 @@ object TestingUtils {
futureExecutor,
ioExecutor,
configuration,
+ highAvailabilityServices,
classOf[TestingJobManager],
prefix
)
@@ -474,6 +419,7 @@ object TestingUtils {
* @param futureExecutor to run the JobManager's futures
* @param ioExecutor to run blocking io operations
* @param configuration Configuration to use
+ * @param highAvailabilityServices Service factory for high availability
* @param jobManagerClass JobManager class to instantiate
* @return
*/
@@ -482,10 +428,18 @@ object TestingUtils {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
configuration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices,
jobManagerClass: Class[_ <: JobManager])
: ActorGateway = {
- createJobManager(actorSystem, futureExecutor, ioExecutor, configuration, jobManagerClass, "")
+ createJobManager(
+ actorSystem,
+ futureExecutor,
+ ioExecutor,
+ configuration,
+ highAvailabilityServices,
+ jobManagerClass,
+ "")
}
/**
@@ -496,6 +450,7 @@ object TestingUtils {
* @param futureExecutor to run the JobManager's futures
* @param ioExecutor to run blocking io operations
* @param configuration Configuration to use
+ * @param highAvailabilityServices Service factory for high availability
* @param jobManagerClass JobManager class to instantiate
* @param prefix The prefix to use for the Actor names
* @return
@@ -505,6 +460,7 @@ object TestingUtils {
futureExecutor: ScheduledExecutorService,
ioExecutor: Executor,
configuration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices,
jobManagerClass: Class[_ <: JobManager],
prefix: String)
: ActorGateway = {
@@ -518,24 +474,33 @@ object TestingUtils {
actorSystem,
futureExecutor,
ioExecutor,
+ highAvailabilityServices,
Some(prefix + JobMaster.JOB_MANAGER_NAME),
Some(prefix + JobMaster.ARCHIVE_NAME),
jobManagerClass,
classOf[MemoryArchivist])
- new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
+
+ val leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT)
+
+ new AkkaActorGateway(actor, leaderId)
}
/** Creates a forwarding JobManager which sends all received message to the forwarding target.
*
* @param actorSystem The actor system to start the actor in.
* @param forwardingTarget Target to forward to.
+ * @param leaderId leader id for the actor gateway
* @param actorName Name for forwarding Actor
* @return
*/
def createForwardingActor(
actorSystem: ActorSystem,
forwardingTarget: ActorRef,
+ leaderId: UUID,
actorName: Option[String] = None)
: ActorGateway = {
@@ -545,7 +510,7 @@ object TestingUtils {
Props(
classOf[ForwardingActor],
forwardingTarget,
- Option(HighAvailabilityServices.DEFAULT_LEADER_ID)),
+ Option(leaderId)),
name
)
case None =>
@@ -553,58 +518,39 @@ object TestingUtils {
Props(
classOf[ForwardingActor],
forwardingTarget,
- Option(HighAvailabilityServices.DEFAULT_LEADER_ID))
+ Option(leaderId))
)
}
- new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
- }
-
- def submitJobAndWait(
- actorSystem: ActorSystem,
- jobManager: ActorGateway,
- jobGraph: JobGraph,
- config: Configuration)
- : JobExecutionResult = {
-
- val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
- val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerURL)
-
- JobClient.submitJobAndWait(
- actorSystem,
- config,
- leaderRetrievalService,
- jobGraph,
- TESTING_DURATION,
- false,
- Thread.currentThread().getContextClassLoader
- )
+ new AkkaActorGateway(actor, leaderId)
}
- /** Creates a testing JobManager using the default recovery mode (standalone)
+ /** Creates a testing JobManager using the given configuration and high availability services.
*
* @param actorSystem The actor system to start the actor
- * @param jobManager The jobManager for the standalone leader service.
* @param configuration The configuration
+ * @param highAvailabilityServices The high availability services to use
* @return
*/
def createResourceManager(
actorSystem: ActorSystem,
- jobManager: ActorRef,
- configuration: Configuration)
+ configuration: Configuration,
+ highAvailabilityServices: HighAvailabilityServices)
: ActorGateway = {
- configuration.setString(
- HighAvailabilityOptions.HA_MODE,
- ConfigConstants.DEFAULT_HA_MODE)
-
- val actor = FlinkResourceManager.startResourceManagerActors(
+ val resourceManager = FlinkResourceManager.startResourceManagerActors(
configuration,
actorSystem,
- LeaderRetrievalUtils.createLeaderRetrievalService(configuration, jobManager),
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
classOf[TestingResourceManager])
- new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
+ val leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT)
+
+ new AkkaActorGateway(resourceManager, leaderId)
}
class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID])
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 4d3adb2..bb26454 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -25,6 +25,7 @@ import org.apache.commons.cli.CommandLine
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.client.CliFrontend
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster}
@@ -140,18 +141,17 @@ object FlinkShell {
def fetchConnectionInfo(
config: Config
- ): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = {
+ ): (String, Int, Option[Either[StandaloneMiniCluster, ClusterClient]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = GlobalConfiguration.loadConfiguration()
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
- val miniCluster = new LocalFlinkMiniCluster(config, false)
- miniCluster.start()
+ val miniCluster = new StandaloneMiniCluster(config)
println("\nStarting local Flink cluster (host: localhost, " +
- s"port: ${miniCluster.getLeaderRPCPort}).\n")
- ("localhost", miniCluster.getLeaderRPCPort, Some(Left(miniCluster)))
+ s"port: ${miniCluster.getPort}).\n")
+ ("localhost", miniCluster.getPort, Some(Left(miniCluster)))
case ExecutionMode.REMOTE => // Remote mode
if (config.host.isEmpty || config.port.isEmpty) {
@@ -188,7 +188,7 @@ object FlinkShell {
val (repl, cluster) = try {
val (host, port, cluster) = fetchConnectionInfo(config)
val conf = cluster match {
- case Some(Left(miniCluster)) => miniCluster.userConfiguration
+ case Some(Left(miniCluster)) => miniCluster.getConfiguration
case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
case None => GlobalConfiguration.loadConfiguration()
}
@@ -218,7 +218,7 @@ object FlinkShell {
} finally {
repl.closeInterpreter()
cluster match {
- case Some(Left(miniCluster)) => miniCluster.stop()
+ case Some(Left(miniCluster)) => miniCluster.close()
case Some(Right(yarnCluster)) => yarnCluster.shutdown()
case _ =>
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 00410cc..0e89da3 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,14 +19,16 @@
package org.apache.flink.api.scala
import java.io._
-import java.util.concurrent.TimeUnit
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import akka.actor.ActorRef
+import akka.pattern.Patterns
+import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
+import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
import org.junit.{AfterClass, Assert, BeforeClass, Test}
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
@@ -299,18 +301,19 @@ class ScalaShellITCase extends TestLogger {
.getFile
val confDir = new File(confFile).getAbsoluteFile.getParent
- val (c, args) = cluster match{
+ val args = cluster match {
case Some(cl) =>
- val arg = Array("remote",
- cl.hostname,
- Integer.toString(cl.getLeaderRPCPort),
+ Array(
+ "remote",
+ cl.getHostname,
+ Integer.toString(cl.getPort),
"--configDir",
confDir)
- (cl, arg)
- case None =>
- throw new AssertionError("Cluster creation failed.")
+ case None => throw new IllegalStateException("Cluster has not been started.")
}
+
+
//start scala shell with initialized
// buffered reader for testing
FlinkShell.bufferedReader = Some(in)
@@ -335,26 +338,24 @@ class ScalaShellITCase extends TestLogger {
}
object ScalaShellITCase {
- var cluster: Option[LocalFlinkMiniCluster] = None
+ var cluster: Option[StandaloneMiniCluster] = None
+
val parallelism = 4
+ val configuration = new Configuration()
@BeforeClass
def beforeAll(): Unit = {
- val cl = TestBaseUtils.startCluster(
- 1,
- parallelism,
- false,
- false,
- false)
-
- cluster = Some(cl)
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+
+ cluster = Option(new StandaloneMiniCluster(configuration))
}
@AfterClass
def afterAll(): Unit = {
// The Scala interpreter somehow changes the class loader. Therfore, we have to reset it
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
- cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+
+ cluster.foreach(_.close)
}
/**
@@ -371,47 +372,46 @@ object ScalaShellITCase {
val oldOut = System.out
System.setOut(new PrintStream(baos))
- // new local cluster
- val host = "localhost"
- val port = cluster match {
- case Some(c) => c.getLeaderRPCPort
- case _ => throw new RuntimeException("Test cluster not initialized.")
- }
-
- val repl = externalJars match {
- case Some(ej) => new FlinkILoop(
- host, port,
- GlobalConfiguration.loadConfiguration(),
- Option(Array(ej)),
- in, new PrintWriter(out))
-
- case None => new FlinkILoop(
- host, port,
- GlobalConfiguration.loadConfiguration(),
- in, new PrintWriter(out))
- }
+ cluster match {
+ case Some(cl) =>
+ val repl = externalJars match {
+ case Some(ej) => new FlinkILoop(
+ cl.getHostname,
+ cl.getPort,
+ GlobalConfiguration.loadConfiguration(),
+ Option(Array(ej)),
+ in, new PrintWriter(out))
+
+ case None => new FlinkILoop(
+ cl.getHostname,
+ cl.getPort,
+ GlobalConfiguration.loadConfiguration(),
+ in, new PrintWriter(out))
+ }
- repl.settings = new Settings()
+ repl.settings = new Settings()
- // enable this line to use scala in intellij
- repl.settings.usejavacp.value = true
+ // enable this line to use scala in intellij
+ repl.settings.usejavacp.value = true
- externalJars match {
- case Some(ej) => repl.settings.classpath.value = ej
- case None =>
- }
+ externalJars match {
+ case Some(ej) => repl.settings.classpath.value = ej
+ case None =>
+ }
- repl.process(repl.settings)
+ repl.process(repl.settings)
- repl.closeInterpreter()
+ repl.closeInterpreter()
- System.setOut(oldOut)
+ System.setOut(oldOut)
- baos.flush()
+ baos.flush()
- val stdout = baos.toString
+ val stdout = baos.toString
- out.toString + stdout
+ out.toString + stdout
+ case _ => throw new IllegalStateException("The cluster has not been started.")
+ }
}
def findLibraryFolder(paths: String*): File = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ff5af87..2ff45ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -219,7 +219,11 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
throw new ProgramInvocationException("The program execution failed" + term, e);
}
finally {
- client.shutdown();
+ try {
+ client.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Could not properly shut down the cluster client.", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 64c68dc..5719456 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -26,43 +27,84 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Preconditions;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
/**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ * A {@link StreamExecutionEnvironment} that executes its jobs on {@link LocalFlinkMiniCluster}.
*/
public class TestStreamEnvironment extends StreamExecutionEnvironment {
/** The mini cluster in which this environment executes its jobs */
- private LocalFlinkMiniCluster executor;
-
+ private final LocalFlinkMiniCluster miniCluster;
+
+ private final Collection<Path> jarFiles;
+
+ private final Collection<URL> classPaths;
+
+ public TestStreamEnvironment(
+ LocalFlinkMiniCluster miniCluster,
+ int parallelism,
+ Collection<Path> jarFiles,
+ Collection<URL> classPaths) {
+
+ this.miniCluster = Preconditions.checkNotNull(miniCluster);
+ this.jarFiles = Preconditions.checkNotNull(jarFiles);
+ this.classPaths = Preconditions.checkNotNull(classPaths);
- public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
- this.executor = Preconditions.checkNotNull(executor);
setParallelism(parallelism);
}
+
+ public TestStreamEnvironment(
+ LocalFlinkMiniCluster miniCluster,
+ int parallelism) {
+ this(miniCluster, parallelism, Collections.<Path>emptyList(), Collections.<URL>emptyList());
+ }
+
@Override
public JobExecutionResult execute(String jobName) throws Exception {
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
final JobGraph jobGraph = streamGraph.getJobGraph();
- return executor.submitJobAndWait(jobGraph, false);
+
+ for (Path jarFile: jarFiles) {
+ jobGraph.addJar(jarFile);
+ }
+
+ jobGraph.setClasspaths(new ArrayList<>(classPaths));
+
+ return miniCluster.submitJobAndWait(jobGraph, false);
}
// ------------------------------------------------------------------------
/**
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
- * the given cluster with the given default parallelism.
- *
+ * the given cluster with the given default parallelism and the specified jar files and class
+ * paths.
+ *
* @param cluster The test cluster to run the test program on.
* @param parallelism The default parallelism for the test programs.
+ * @param jarFiles Additional jar files to execute the job with
+ * @param classpaths Additional class paths to execute the job with
*/
- public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
-
+ public static void setAsContext(
+ final LocalFlinkMiniCluster cluster,
+ final int parallelism,
+ final Collection<Path> jarFiles,
+ final Collection<URL> classpaths) {
+
StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
- return new TestStreamEnvironment(cluster, parallelism);
+ return new TestStreamEnvironment(
+ cluster,
+ parallelism,
+ jarFiles,
+ classpaths);
}
};
@@ -70,6 +112,21 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
}
/**
+ * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
+ * the given cluster with the given default parallelism.
+ *
+ * @param cluster The test cluster to run the test program on.
+ * @param parallelism The default parallelism for the test programs.
+ */
+ public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
+ setAsContext(
+ cluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.<URL>emptyList());
+ }
+
+ /**
* Resets the streaming context environment to null.
*/
public static void unsetAsContext() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 2bcf66d..f06cdb7 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -111,7 +111,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
+ TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
env.getConfig().enableObjectReuse();
env.setAsContext();
@@ -163,7 +163,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
- TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
+ TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
env.getConfig().disableObjectReuse();
env.setAsContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 2043cd0..ed141d1 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -86,7 +86,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
switch(mode){
case CLUSTER:
- new TestEnvironment(cluster, 4).setAsContext();
+ new TestEnvironment(cluster, 4, false).setAsContext();
break;
case CLUSTER_OBJECT_REUSE:
new TestEnvironment(cluster, 4, true).setAsContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index aea8152..a30db40 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.CodeAnalysisMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -30,39 +31,71 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.util.Preconditions;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link ExecutionEnvironment} implementation which executes its jobs on a
+ * {@link LocalFlinkMiniCluster}.
+ */
public class TestEnvironment extends ExecutionEnvironment {
- private final LocalFlinkMiniCluster executor;
+ private final LocalFlinkMiniCluster miniCluster;
- private TestEnvironment lastEnv = null;
+ private final Collection<Path> jarFiles;
- @Override
- public JobExecutionResult getLastJobExecutionResult() {
- if (lastEnv == null) {
- return this.lastJobExecutionResult;
- }
- else {
- return lastEnv.getLastJobExecutionResult();
- }
- }
+ private final Collection<URL> classPaths;
+
+ private TestEnvironment lastEnv;
+
+ public TestEnvironment(
+ LocalFlinkMiniCluster miniCluster,
+ int parallelism,
+ boolean isObjectReuseEnabled,
+ Collection<Path> jarFiles,
+ Collection<URL> classPaths) {
+ this.miniCluster = Preconditions.checkNotNull(miniCluster);
+ this.jarFiles = Preconditions.checkNotNull(jarFiles);
+ this.classPaths = Preconditions.checkNotNull(classPaths);
- public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
- this.executor = executor;
setParallelism(parallelism);
// disabled to improve build time
getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
- }
-
- public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
- this(executor, parallelism);
if (isObjectReuseEnabled) {
getConfig().enableObjectReuse();
} else {
getConfig().disableObjectReuse();
}
+
+ lastEnv = null;
+ }
+
+ public TestEnvironment(
+ LocalFlinkMiniCluster executor,
+ int parallelism,
+ boolean isObjectReuseEnabled) {
+ this(
+ executor,
+ parallelism,
+ isObjectReuseEnabled,
+ Collections.<Path>emptyList(),
+ Collections.<URL>emptyList());
+ }
+
+ @Override
+ public JobExecutionResult getLastJobExecutionResult() {
+ if (lastEnv == null) {
+ return lastJobExecutionResult;
+ }
+ else {
+ return lastEnv.getLastJobExecutionResult();
+ }
}
@Override
@@ -76,7 +109,13 @@ public class TestEnvironment extends ExecutionEnvironment {
JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
- this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
+ for (Path jarFile: jarFiles) {
+ jobGraph.addJar(jarFile);
+ }
+
+ jobGraph.setClasspaths(new ArrayList<>(classPaths));
+
+ this.lastJobExecutionResult = miniCluster.submitJobAndWait(jobGraph, false);
return this.lastJobExecutionResult;
}
@@ -93,7 +132,7 @@ public class TestEnvironment extends ExecutionEnvironment {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);
- Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
+ Optimizer pc = new Optimizer(new DataStatistics(), this.miniCluster.configuration());
return pc.compile(p);
}
@@ -101,11 +140,65 @@ public class TestEnvironment extends ExecutionEnvironment {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled());
+ lastEnv = new TestEnvironment(miniCluster, getParallelism(), getConfig().isObjectReuseEnabled());
return lastEnv;
}
};
initializeContextEnvironment(factory);
}
+
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the current {@link ExecutionEnvironment} to be a {@link TestEnvironment}. The test
+ * environment executes the given jobs on a Flink mini cluster with the given default
+ * parallelism and the additional jar files and class paths.
+ *
+ * @param miniCluster The mini cluster on which to execute the jobs
+ * @param parallelism The default parallelism
+ * @param jarFiles Additional jar files to execute the job with
+ * @param classPaths Additional class paths to execute the job with
+ */
+ public static void setAsContext(
+ final LocalFlinkMiniCluster miniCluster,
+ final int parallelism,
+ final Collection<Path> jarFiles,
+ final Collection<URL> classPaths) {
+
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return new TestEnvironment(
+ miniCluster,
+ parallelism,
+ false, jarFiles,
+ classPaths
+ );
+ }
+ };
+
+ initializeContextEnvironment(factory);
+ }
+
+
+ /**
+ * Sets the current {@link ExecutionEnvironment} to be a {@link TestEnvironment}. The test
+ * environment executes the given jobs on a Flink mini cluster with the given default
+ * parallelism and the additional jar files and class paths.
+ *
+ * @param miniCluster The mini cluster on which to execute the jobs
+ * @param parallelism The default parallelism
+ */
+ public static void setAsContext(final LocalFlinkMiniCluster miniCluster, final int parallelism) {
+ setAsContext(
+ miniCluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.<URL>emptyList());
+ }
+
+ public static void unsetAsContext() {
+ resetContextEnvironment();
+ }
}