You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:13 UTC
[08/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close
calls from ZooKeeper based HA services
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 195baa1..c8459e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.jobmanager;
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.typesafe.config.Config;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
@@ -55,8 +59,6 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
@@ -95,16 +97,15 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentCaptor;
import scala.Option;
import scala.Some;
import scala.Tuple2;
@@ -117,6 +118,7 @@ import scala.reflect.ClassTag$;
import java.io.File;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -127,15 +129,14 @@ import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.Al
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
public class JobManagerTest extends TestLogger {
@@ -144,6 +145,8 @@ public class JobManagerTest extends TestLogger {
private static ActorSystem system;
+ private HighAvailabilityServices highAvailabilityServices;
+
@BeforeClass
public static void setup() {
system = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -154,6 +157,17 @@ public class JobManagerTest extends TestLogger {
JavaTestKit.shutdownActorSystem(system);
}
+ @Before
+ public void setupTest() {
+ highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices = null;
+ }
+
@Test
public void testNullHostnameGoesToLocalhost() {
try {
@@ -589,32 +603,36 @@ public class JobManagerTest extends TestLogger {
Configuration config = new Configuration();
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
+ ActorRef jobManagerActor = JobManager.startJobManagerActors(
+ config,
+ system,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
+ TestingJobManager.class,
+ MemoryArchivist.class)._1();
+
+ UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
ActorGateway jobManager = new AkkaActorGateway(
- JobManager.startJobManagerActors(
- config,
- system,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- TestingJobManager.class,
- MemoryArchivist.class)._1(),
- HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
- AkkaUtils.getAkkaURL(system, jobManager.actor()));
+ jobManagerActor,
+ leaderId);
Configuration tmConfig = new Configuration();
tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- system,
- "localhost",
- scala.Option.<String>empty(),
- scala.Option.apply(leaderRetrievalService),
- true,
- TestingTaskManager.class);
+ tmConfig,
+ ResourceID.generate(),
+ system,
+ highAvailabilityServices,
+ "localhost",
+ scala.Option.<String>empty(),
+ true,
+ TestingTaskManager.class);
Future<Object> registrationFuture = jobManager
.ask(new NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
@@ -783,6 +801,7 @@ public class JobManagerTest extends TestLogger {
// Wait for failure
JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
assertEquals(JobStatus.FAILED, jobStatus.state());
+
}
@Test
@@ -805,25 +824,30 @@ public class JobManagerTest extends TestLogger {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
TestingMemoryArchivist.class);
- jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
- archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+ UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ jobManager = new AkkaActorGateway(master._1(), leaderId);
+ archiver = new AkkaActorGateway(master._2(), leaderId);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- actorSystem,
- "localhost",
- Option.apply("tm"),
- Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
- true,
- TestingTaskManager.class);
+ config,
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
- taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+ taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
// Wait until connected
Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -907,6 +931,10 @@ public class JobManagerTest extends TestLogger {
if (taskManager != null) {
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}
+
+ if (actorSystem != null) {
+ actorSystem.awaitTermination(TESTING_TIMEOUT());
+ }
}
}
@@ -931,25 +959,30 @@ public class JobManagerTest extends TestLogger {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
TestingMemoryArchivist.class);
- jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
- archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+ UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ jobManager = new AkkaActorGateway(master._1(), leaderId);
+ archiver = new AkkaActorGateway(master._2(), leaderId);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
config,
ResourceID.generate(),
actorSystem,
+ highAvailabilityServices,
"localhost",
Option.apply("tm"),
- Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
true,
TestingTaskManager.class);
- taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+ taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
// Wait until connected
Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1037,25 +1070,30 @@ public class JobManagerTest extends TestLogger {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
TestingMemoryArchivist.class);
- jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
- archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+ UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ jobManager = new AkkaActorGateway(master._1(), leaderId);
+ archiver = new AkkaActorGateway(master._2(), leaderId);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- actorSystem,
- "localhost",
- Option.apply("tm"),
- Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
- true,
- TestingTaskManager.class);
+ config,
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
- taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+ taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
// Wait until connected
Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1115,6 +1153,10 @@ public class JobManagerTest extends TestLogger {
if (taskManager != null) {
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}
+
+ if (actorSystem != null) {
+ actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+ }
}
}
@@ -1137,28 +1179,33 @@ public class JobManagerTest extends TestLogger {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
TestingMemoryArchivist.class);
- jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
- archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+ UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ TestingUtils.TESTING_TIMEOUT());
+
+ jobManager = new AkkaActorGateway(master._1(), leaderId);
+ archiver = new AkkaActorGateway(master._2(), leaderId);
Configuration tmConfig = new Configuration();
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- "localhost",
- Option.apply("tm"),
- Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
- true,
- TestingTaskManager.class);
+ tmConfig,
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ "localhost",
+ Option.apply("tm"),
+ true,
+ TestingTaskManager.class);
- taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+ taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
// Wait until connected
Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1274,6 +1321,10 @@ public class JobManagerTest extends TestLogger {
if (taskManager != null) {
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}
+
+ if (actorSystem != null) {
+ actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+ }
}
}
@@ -1298,7 +1349,8 @@ public class JobManagerTest extends TestLogger {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- configuration);
+ configuration,
+ highAvailabilityServices);
final TestProbe probe = TestProbe.apply(actorSystem);
final AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
@@ -1311,7 +1363,7 @@ public class JobManagerTest extends TestLogger {
JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
- assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, leaderSessionMessage.leaderSessionID());
+ assertEquals(jmGateway.leaderSessionID(), leaderSessionMessage.leaderSessionID());
assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);
jmGateway.tell(
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index fcca173..37f503f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -68,6 +70,7 @@ public class JobSubmitTest {
private static ActorSystem jobManagerSystem;
private static ActorGateway jmGateway;
private static Configuration jmConfig;
+ private static HighAvailabilityServices highAvailabilityServices;
@BeforeClass
public static void setupJobManager() {
@@ -81,17 +84,20 @@ public class JobSubmitTest {
scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress);
+ highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
// only start JobManager (no ResourceManager)
JobManager.startJobManagerActors(
jmConfig,
jobManagerSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1();
try {
- LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig, false);
+ LeaderRetrievalService lrs = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
lrs,
@@ -104,10 +110,15 @@ public class JobSubmitTest {
}
@AfterClass
- public static void teardownJobmanager() {
+ public static void teardownJobmanager() throws Exception {
if (jobManagerSystem != null) {
jobManagerSystem.shutdown();
}
+
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices = null;
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index d2221c5..753c797 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -56,7 +57,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobManagerRunner.class)
-public class JobManagerRunnerMockTest {
+public class JobManagerRunnerMockTest extends TestLogger {
private JobManagerRunner runner;
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index b1bb548..942fcf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,9 +18,13 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -28,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
@@ -52,11 +57,15 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
private int numSlotsPerTM = 1;
private int parallelism = numTMs * numSlotsPerTM;
- private LeaderElectionRetrievalTestingCluster cluster = null;
+ private JobID jobId;
+ private TestingCluster cluster = null;
private JobGraph job = createBlockingJob(parallelism);
+ private TestingManualHighAvailabilityServices highAvailabilityServices;
@Before
public void before() throws TimeoutException, InterruptedException {
+ jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
+
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
Configuration configuration = new Configuration();
@@ -69,7 +78,13 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 milli");
- cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
+ highAvailabilityServices = new TestingManualHighAvailabilityServices();
+
+ cluster = new TestingCluster(
+ configuration,
+ highAvailabilityServices,
+ true,
+ false);
cluster.start(false);
// wait for actors to be alive so that they have started their leader retrieval service
@@ -86,8 +101,15 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
public void testNotRestartedWhenLosingLeadership() throws Exception {
UUID leaderSessionID = UUID.randomUUID();
- cluster.grantLeadership(0, leaderSessionID);
- cluster.notifyRetrievalListeners(0, leaderSessionID);
+ highAvailabilityServices.grantLeadership(
+ jobId,
+ 0,
+ leaderSessionID);
+
+ highAvailabilityServices.notifyRetrievers(
+ jobId,
+ 0,
+ leaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -108,7 +130,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
- cluster.revokeLeadership();
+ highAvailabilityServices.revokeLeadership(jobId);
executionGraph.getTerminationFuture().get(30, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 7ae9974..bbcbbf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -28,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -58,12 +62,16 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
private int numSlotsPerTM = 2;
private int parallelism = numTMs * numSlotsPerTM;
+ private JobID jobId;
private Configuration configuration;
- private LeaderElectionRetrievalTestingCluster cluster = null;
+ private TestingManualHighAvailabilityServices highAvailabilityServices;
+ private TestingCluster cluster = null;
private JobGraph job = createBlockingJob(parallelism);
@Before
public void before() throws Exception {
+ jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
+
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
configuration = new Configuration();
@@ -72,7 +80,13 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
- cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
+ highAvailabilityServices = new TestingManualHighAvailabilityServices();
+
+ cluster = new TestingCluster(
+ configuration,
+ highAvailabilityServices,
+ true,
+ false);
cluster.start(false); // TaskManagers don't have to register at the JobManager
cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
@@ -96,9 +110,9 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
UUID leaderSessionID2 = UUID.randomUUID();
// first make JM(0) the leader
- cluster.grantLeadership(0, leaderSessionID1);
+ highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID1);
// notify all listeners
- cluster.notifyRetrievalListeners(0, leaderSessionID1);
+ highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID1);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -114,9 +128,9 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
// make the JM(1) the new leader
- cluster.grantLeadership(1, leaderSessionID2);
+ highAvailabilityServices.grantLeadership(jobId, 1, leaderSessionID2);
// notify all listeners about the event
- cluster.notifyRetrievalListeners(1, leaderSessionID2);
+ highAvailabilityServices.notifyRetrievers(jobId, 1, leaderSessionID2);
Await.ready(jobRemoval, timeout);
@@ -133,7 +147,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
// try to resubmit now the non-blocking job, it should complete successfully
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
- cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(jm2.path(), jm2.leaderSessionID()));
+ cluster.submitJobAndWait(job, false, timeout);
}
/**
@@ -146,8 +160,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
UUID leaderSessionID = UUID.randomUUID();
UUID newLeaderSessionID = UUID.randomUUID();
- cluster.grantLeadership(0, leaderSessionID);
- cluster.notifyRetrievalListeners(0, leaderSessionID);
+ highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+ highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -163,7 +177,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
// only notify the JMs about the new leader JM(1)
- cluster.grantLeadership(1, newLeaderSessionID);
+ highAvailabilityServices.grantLeadership(jobId, 1, newLeaderSessionID);
// job should be removed anyway
Await.ready(jobRemoval, timeout);
@@ -179,8 +193,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
UUID leaderSessionID = UUID.randomUUID();
UUID newLeaderSessionID = UUID.randomUUID();
- cluster.grantLeadership(0, leaderSessionID);
- cluster.notifyRetrievalListeners(0, leaderSessionID);
+ highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+ highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -196,7 +210,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
// notify listeners (TMs) about the leader change
- cluster.notifyRetrievalListeners(1, newLeaderSessionID);
+ highAvailabilityServices.notifyRetrievers(jobId, 1, newLeaderSessionID);
Await.ready(jobRemoval, timeout);
}
@@ -213,8 +227,8 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
FiniteDuration shortTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
- cluster.grantLeadership(0, leaderSessionID);
- cluster.notifyRetrievalListeners(0, leaderSessionID);
+ highAvailabilityServices.grantLeadership(jobId, 0, leaderSessionID);
+ highAvailabilityServices.notifyRetrievers(jobId, 0, leaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -232,7 +246,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
LOG.info("Make JM(0) again the leader. This should first revoke the leadership.");
// make JM(0) again the leader --> this implies first a leadership revocation
- cluster.grantLeadership(0, newLeaderSessionID);
+ highAvailabilityServices.grantLeadership(jobId, 0, newLeaderSessionID);
Await.ready(jobRemoval, timeout);
@@ -250,7 +264,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
LOG.info("Notify TMs about the new (old) leader.");
// notify the TMs about the new (old) leader
- cluster.notifyRetrievalListeners(0, newLeaderSessionID);
+ highAvailabilityServices.notifyRetrievers(jobId,0, newLeaderSessionID);
cluster.waitForTaskManagersToBeRegistered(timeout);
@@ -258,7 +272,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
// try to resubmit now the non-blocking job, it should complete successfully
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
- cluster.submitJobAndWait(job, false, timeout, new TestingLeaderRetrievalService(leaderGateway.path(), leaderGateway.leaderSessionID()));
+ cluster.submitJobAndWait(job, false, timeout);
}
public JobGraph createBlockingJob(int parallelism) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
deleted file mode 100644
index 1cab0ea..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A testing cluster which allows to manually trigger grantLeadership and notifyRetrievalListener
- * events. The grantLeadership event assigns the specified JobManager the leadership. The
- * notifyRetrievalListener notifies all listeners that the specified JobManager (index) has been
- * granted the leadership.
- */
-public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
-
- private final Configuration userConfiguration;
- private final boolean useSingleActorSystem;
-
- public List<TestingLeaderElectionService> leaderElectionServices;
- public List<TestingLeaderRetrievalService> leaderRetrievalServices;
-
- private int leaderIndex = -1;
-
- public LeaderElectionRetrievalTestingCluster(
- Configuration userConfiguration,
- boolean singleActorSystem,
- boolean synchronousDispatcher) {
- super(userConfiguration, singleActorSystem, synchronousDispatcher);
-
- this.userConfiguration = userConfiguration;
- this.useSingleActorSystem = singleActorSystem;
-
- leaderElectionServices = new ArrayList<>();
- leaderRetrievalServices = new ArrayList<>();
- }
-
- @Override
- public Configuration userConfiguration() {
- return this.userConfiguration;
- }
-
- @Override
- public boolean useSingleActorSystem() {
- return useSingleActorSystem;
- }
-
- @Override
- public Option<LeaderElectionService> createLeaderElectionService() {
- leaderElectionServices.add(new TestingLeaderElectionService());
-
- LeaderElectionService result = leaderElectionServices.get(leaderElectionServices.size() - 1);
-
- return Option.apply(result);
- }
-
- @Override
- public LeaderRetrievalService createLeaderRetrievalService() {
- leaderRetrievalServices.add(new TestingLeaderRetrievalService(
- null,
- null));
-
- return leaderRetrievalServices.get(leaderRetrievalServices.size() - 1);
- }
-
- @Override
- public int getNumberOfJobManagers() {
- return this.originalConfiguration().getInteger(
- ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
- ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
- }
-
- public void grantLeadership(int index, UUID leaderSessionID) {
- if(leaderIndex >= 0) {
- // first revoke leadership
- leaderElectionServices.get(leaderIndex).notLeader();
- }
-
- // make the JM with index the new leader
- leaderElectionServices.get(index).isLeader(leaderSessionID);
-
- leaderIndex = index;
- }
-
- public void notifyRetrievalListeners(int index, UUID leaderSessionID) {
- String address = jobManagerActors().get().apply(index).path().toString();
-
- for(TestingLeaderRetrievalService service: leaderRetrievalServices) {
- service.notifyListener(address, leaderSessionID);
- }
- }
-
- public void revokeLeadership() {
- if (leaderIndex >= 0) {
- leaderElectionServices.get(leaderIndex).notLeader();
- leaderIndex = -1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 16779fa..d456083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -63,4 +63,8 @@ public class TestingLeaderElectionService implements LeaderElectionService {
contender = null;
hasLeadership = false;
}
+
+ public String getAddress() {
+ return contender.getAddress();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index 887772a..15d3bde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -35,6 +35,10 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
private volatile LeaderRetrievalListener listener;
+ public TestingLeaderRetrievalService() {
+ this(null, null);
+ }
+
public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) {
this.leaderAddress = leaderAddress;
this.leaderSessionID = leaderSessionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index bc8c0b60..6efd270 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
@@ -37,6 +36,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -48,6 +48,7 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -59,6 +60,11 @@ import static org.junit.Assert.*;
public class ZooKeeperLeaderElectionTest extends TestLogger {
private TestingServer testingServer;
+
+ private Configuration configuration;
+
+ private CuratorFramework client;
+
private static final String TEST_URL = "akka//user/jobmanager";
private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
@@ -71,17 +77,26 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
} catch (Exception e) {
throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
}
+
+ configuration = new Configuration();
+
+ configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+ configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ client = ZooKeeperUtils.startCuratorFramework(configuration);
}
@After
- public void after() {
- try {
- testingServer.stop();
- } catch (Exception e) {
- throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
+ public void after() throws IOException {
+ if (client != null) {
+ client.close();
+ client = null;
}
- testingServer = null;
+ if (testingServer != null) {
+ testingServer.stop();
+ testingServer = null;
+ }
}
/**
@@ -89,16 +104,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
*/
@Test
public void testZooKeeperLeaderElectionRetrieval() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
ZooKeeperLeaderElectionService leaderElectionService = null;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
try {
- leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
TestingContender contender = new TestingContender(TEST_URL, leaderElectionService);
TestingListener listener = new TestingListener();
@@ -134,10 +145,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
*/
@Test
public void testZooKeeperReelection() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
int num = 10;
@@ -149,14 +156,14 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
TestingListener listener = new TestingListener();
try {
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
LOG.debug("Start leader retrieval service for the TestingListener.");
leaderRetrievalService.start(listener);
for (int i = 0; i < num; i++) {
- leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration);
+ leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
contenders[i] = new TestingContender(TEST_URL + "_" + i, leaderElectionService[i]);
LOG.debug("Start leader election service for contender #{}.", i);
@@ -217,10 +224,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
*/
@Test
public void testZooKeeperReelectionWithReplacement() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
int num = 3;
int numTries = 30;
@@ -231,12 +234,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
TestingListener listener = new TestingListener();
try {
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
leaderRetrievalService.start(listener);
for (int i = 0; i < num; i++) {
- leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration);
+ leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
contenders[i] = new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]);
leaderElectionService[i].start(contenders[i]);
@@ -261,7 +264,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
// stop leader election service = revoke leadership
leaderElectionService[index].stop();
// create new leader election service which takes part in the leader election
- leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(configuration);
+ leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(client, configuration);
contenders[index] = new TestingContender(
TEST_URL + "_" + index + "_" + (lastTry + 1),
leaderElectionService[index]);
@@ -295,9 +298,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
final String FAULTY_CONTENDER_URL = "faultyContender";
final String leaderPath = "/leader";
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -308,9 +308,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
TestingContender contender;
try {
- leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
- leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
+ leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
contender = new TestingContender(TEST_URL, leaderElectionService);
@@ -379,18 +379,13 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
*/
@Test
public void testExceptionForwarding() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
ZooKeeperLeaderElectionService leaderElectionService = null;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
TestingListener listener = new TestingListener();
TestingContender testingContender;
CuratorFramework client;
- final CreateBuilder mockCreateBuilder = mock(CreateBuilder.class);
- final ProtectACLCreateModePathAndBytesable<String> mockCreateParentsIfNeeded = mock (ProtectACLCreateModePathAndBytesable.class);
+ final CreateBuilder mockCreateBuilder = mock(CreateBuilder.class, Mockito.RETURNS_DEEP_STUBS);
final Exception testException = new Exception("Test exception");
try {
@@ -414,12 +409,14 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
doAnswer(answer).when(client).create();
- when(mockCreateBuilder.creatingParentsIfNeeded()).thenReturn(mockCreateParentsIfNeeded);
- when(mockCreateParentsIfNeeded.withMode(Matchers.any(CreateMode.class))).thenReturn(mockCreateParentsIfNeeded);
- when(mockCreateParentsIfNeeded.forPath(Matchers.any(String.class), Matchers.any(byte[].class))).thenThrow(testException);
+ when(
+ mockCreateBuilder
+ .creatingParentsIfNeeded()
+ .withMode(Matchers.any(CreateMode.class))
+ .forPath(anyString(), any(byte[].class))).thenThrow(testException);
leaderElectionService = new ZooKeeperLeaderElectionService(client, "/latch", "/leader");
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
testingContender = new TestingContender(TEST_URL, leaderElectionService);
@@ -442,34 +439,33 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
}
/**
- * Tests that there is no information left in the ZooKeeper cluster after all JobManagers
- * have terminated. In other words, checks that the ZooKeeperLeaderElection service uses
+ * Tests that there is no information left in the ZooKeeper cluster after the ZooKeeper client
+ * has terminated. In other words, checks that the ZooKeeperLeaderElection service uses
* ephemeral nodes.
*/
@Test
public void testEphemeralZooKeeperNodes() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-
ZooKeeperLeaderElectionService leaderElectionService;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
TestingContender testingContender;
TestingListener listener;
CuratorFramework client = null;
+ CuratorFramework client2 = null;
NodeCache cache = null;
try {
- leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration);
- leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
+ client = ZooKeeperUtils.startCuratorFramework(configuration);
+ client2 = ZooKeeperUtils.startCuratorFramework(configuration);
+
+ leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client2, configuration);
testingContender = new TestingContender(TEST_URL, leaderElectionService);
listener = new TestingListener();
- client = ZooKeeperUtils.startCuratorFramework(configuration);
final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
- cache = new NodeCache(client, leaderPath);
+ cache = new NodeCache(client2, leaderPath);
ExistsCacheListener existsListener = new ExistsCacheListener(cache);
DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
@@ -489,6 +485,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
leaderElectionService.stop();
+ // now stop the underlying client
+ client.close();
+
Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
// make sure that the leader node has been deleted
@@ -497,7 +496,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
leaderRetrievalService.start(listener);
try {
- listener.waitForNewLeader(1000);
+ listener.waitForNewLeader(1000L);
fail("TimeoutException was expected because there is no leader registered and " +
"thus there shouldn't be any leader information in ZooKeeper.");
@@ -513,8 +512,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
cache.close();
}
- if (client != null) {
- client.close();
+ if (client2 != null) {
+ client2.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index b79093f..0ea47f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,18 +18,17 @@
package org.apache.flink.runtime.leaderelection;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -53,29 +52,41 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
private TestingServer testingServer;
+ private Configuration config;
+
+ private HighAvailabilityServices highAvailabilityServices;
+
@Before
- public void before() {
- try {
- testingServer = new TestingServer();
- } catch (Exception e) {
- throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
- }
+ public void before() throws Exception {
+ testingServer = new TestingServer();
+
+ config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+
+ highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
}
@After
- public void after() {
+ public void after() throws Exception {
if(testingServer != null) {
- try {
- testingServer.stop();
- } catch (IOException e) {
- throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
- }
+ testingServer.stop();
+
testingServer = null;
}
+
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+
+ highAvailabilityServices = null;
+ }
}
/**
- * Tests that LeaderRetrievalUtils.findConnectingAdress finds the correct connecting address
+ * Tests that LeaderRetrievalUtils.findConnectingAddress finds the correct connecting address
* in case of an old leader address in ZooKeeper and a subsequent election of a new leader.
* The findConnectingAddress should block until the new leader has been elected and his
* address has been written to ZooKeeper.
@@ -83,13 +94,9 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
@Test
public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception {
FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
- Configuration config = new Configuration();
long sleepingTime = 1000;
- config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-
LeaderElectionService leaderElectionService = null;
LeaderElectionService faultyLeaderElectionService;
@@ -98,12 +105,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
Thread thread;
- CuratorFramework[] client = new CuratorFramework[2];
-
try {
- client[0] = ZooKeeperUtils.startCuratorFramework(config);
- client[1] = ZooKeeperUtils.startCuratorFramework(config);
-
String wrongAddress = AkkaRpcServiceUtils.getRpcUrl(
"1.1.1.1",
1234,
@@ -126,7 +128,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
}
InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
- String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
String correctAddress = AkkaRpcServiceUtils.getRpcUrl(
localHost.getHostName(),
@@ -135,18 +136,21 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
config);
- faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
+ faultyLeaderElectionService = highAvailabilityServices.getJobManagerLeaderElectionService(
+ HighAvailabilityServices.DEFAULT_JOB_ID);
TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);
faultyLeaderElectionService.start(wrongLeaderAddressContender);
- FindConnectingAddress findConnectingAddress = new FindConnectingAddress(config, timeout);
+ FindConnectingAddress findConnectingAddress = new FindConnectingAddress(
+ timeout,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID));
thread = new Thread(findConnectingAddress);
thread.start();
- leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[1], config);
+ leaderElectionService = highAvailabilityServices.getJobManagerLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID);
TestingContender correctLeaderAddressContender = new TestingContender(correctAddress, leaderElectionService);
Thread.sleep(sleepingTime);
@@ -174,14 +178,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
if (leaderElectionService != null) {
leaderElectionService.stop();
}
-
- if (client[0] != null) {
- client[0].close();
- }
-
- if (client[1] != null) {
- client[1].close();
- }
}
}
@@ -192,13 +188,9 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
*/
@Test
public void testTimeoutOfFindConnectingAddress() throws Exception {
- Configuration config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
-
- FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+ FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
+ LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
assertEquals(InetAddress.getLocalHost(), result);
@@ -206,22 +198,25 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
static class FindConnectingAddress implements Runnable {
- private final Configuration config;
private final FiniteDuration timeout;
+ private final LeaderRetrievalService leaderRetrievalService;
private InetAddress result;
private Exception exception;
- public FindConnectingAddress(Configuration config, FiniteDuration timeout) {
- this.config = config;
+ public FindConnectingAddress(
+ FiniteDuration timeout,
+ LeaderRetrievalService leaderRetrievalService) {
this.timeout = timeout;
+ this.leaderRetrievalService = leaderRetrievalService;
}
@Override
public void run() {
try {
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, false);
- result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
+ result = LeaderRetrievalUtils.findConnectingAddress(
+ leaderRetrievalService,
+ timeout);
} catch (Exception e) {
exception = e;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 100c83d..58f2231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -26,10 +26,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
@@ -37,15 +36,17 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-public class TaskManagerMetricsTest {
+public class TaskManagerMetricsTest extends TestLogger {
/**
* Tests the metric registry life cycle on JobManager re-connects.
@@ -53,6 +54,9 @@ public class TaskManagerMetricsTest {
@Test
public void testMetricRegistryLifeCycle() throws Exception {
ActorSystem actorSystem = null;
+
+ HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
try {
actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -64,11 +68,10 @@ public class TaskManagerMetricsTest {
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1();
- LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
-
// ================================================================
// Start TaskManager
// ================================================================
@@ -94,7 +97,7 @@ public class TaskManagerMetricsTest {
taskManagerServices.getMemoryManager(),
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
- leaderRetrievalService,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
tmRegistry);
final ActorRef taskManager = actorSystem.actorOf(tmProps);
@@ -107,20 +110,21 @@ public class TaskManagerMetricsTest {
getTestActor());
// wait for the TM to be registered
- expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+ TaskManagerMessages.RegisteredAtJobManager registeredAtJobManager = expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
+ UUID leaderId = registeredAtJobManager.leaderId();
// trigger re-registration of TM; this should include a disconnect from the current JM
taskManager.tell(
new TaskManagerMessages.JobManagerLeaderAddress(
jobManager.path().toString(),
- HighAvailabilityServices.DEFAULT_LEADER_ID),
+ leaderId),
jobManager);
// wait for re-registration to be completed
taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
getTestActor());
- expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+ expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
}
};
}};
@@ -135,6 +139,10 @@ public class TaskManagerMetricsTest {
if (actorSystem != null) {
actorSystem.shutdown();
}
+
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index cecfe6a..973fddf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -44,7 +46,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -52,23 +53,24 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-public class TaskManagerComponentsStartupShutdownTest {
+public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
/**
* Makes sure that all components are shut down when the TaskManager
* actor is shut down.
*/
@Test
- public void testComponentsStartupShutdown() {
+ public void testComponentsStartupShutdown() throws Exception {
final String[] TMP_DIR = new String[] { ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH };
final Time timeout = Time.seconds(100);
@@ -80,21 +82,28 @@ public class TaskManagerComponentsStartupShutdownTest {
config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
ActorSystem actorSystem = null;
+
+ HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+
try {
actorSystem = AkkaUtils.createLocalActorSystem(config);
- final ActorRef jobManager = JobManager.startJobManagerActors(
+ jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1();
FlinkResourceManager.startResourceManagerActors(
config,
actorSystem,
- LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager),
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
StandaloneResourceManager.class);
final int numberOfSlots = 1;
@@ -137,7 +146,8 @@ public class TaskManagerComponentsStartupShutdownTest {
network.start();
- LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
+ LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID);
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
@@ -154,18 +164,20 @@ public class TaskManagerComponentsStartupShutdownTest {
leaderRetrievalService,
new MetricRegistry(metricRegistryConfiguration));
- final ActorRef taskManager = actorSystem.actorOf(tmProps);
+ taskManager = actorSystem.actorOf(tmProps);
+
+ final ActorRef finalTaskManager = taskManager;
new JavaTestKit(actorSystem) {{
// wait for the TaskManager to be registered
- new Within(new FiniteDuration(5000, TimeUnit.SECONDS)) {
+ new Within(new FiniteDuration(5000L, TimeUnit.SECONDS)) {
@Override
protected void run() {
- taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ finalTaskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
getTestActor());
- expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+ expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
}
};
}};
@@ -184,15 +196,16 @@ public class TaskManagerComponentsStartupShutdownTest {
assertTrue(network.isShutdown());
assertTrue(ioManager.isProperlyShutDown());
assertTrue(memManager.isShutdown());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
+ TestingUtils.stopActorsGracefully(Arrays.asList(jobManager, taskManager));
+
if (actorSystem != null) {
actorSystem.shutdown();
+
+ actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
}
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index c0d0455..a760760 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.junit.Test;
import scala.Tuple2;
@@ -44,16 +47,23 @@ import static org.junit.Assert.*;
public class TaskManagerConfigurationTest {
@Test
- public void testUsePreconfiguredNetworkInterface() {
+ public void testUsePreconfiguredNetworkInterface() throws Exception {
+ final String TEST_HOST_NAME = "testhostname";
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+
+ HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ Executors.directExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
try {
- final String TEST_HOST_NAME = "testhostname";
- Configuration config = new Configuration();
- config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
- Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config);
+ Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
// validate the configured test host name
assertEquals(TEST_HOST_NAME, address._1());
@@ -61,30 +71,37 @@ public class TaskManagerConfigurationTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
+ } finally {
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
@Test
- public void testActorSystemPortConfig() {
- try {
- // config with pre-configured hostname to speed up tests (no interface selection)
- Configuration config = new Configuration();
- config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+ public void testActorSystemPortConfig() throws Exception {
+ // config with pre-configured hostname to speed up tests (no interface selection)
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+
+ HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ Executors.directExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ try {
// auto port
- assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config)._2());
+ assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
// pre-defined port
final int testPort = 22551;
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
- assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config)._2());
+ assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
// invalid port
try {
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
- TaskManager.selectNetworkInterfaceAndPort(config);
+ TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
fail("should fail with an exception");
}
catch (IllegalConfigurationException e) {
@@ -94,7 +111,7 @@ public class TaskManagerConfigurationTest {
// invalid port
try {
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
- TaskManager.selectNetworkInterfaceAndPort(config);
+ TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
fail("should fail with an exception");
}
catch (IllegalConfigurationException e) {
@@ -104,6 +121,8 @@ public class TaskManagerConfigurationTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
+ } finally {
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
@@ -136,7 +155,7 @@ public class TaskManagerConfigurationTest {
}
@Test
- public void testNetworkInterfaceSelection() {
+ public void testNetworkInterfaceSelection() throws Exception {
ServerSocket server;
String hostname = "localhost";
@@ -149,20 +168,27 @@ public class TaskManagerConfigurationTest {
return;
}
- try {
- // open a server port to allow the system to connect
- Configuration config = new Configuration();
+ // open a server port to allow the system to connect
+ Configuration config = new Configuration();
+
+ config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
+ config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
+ HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ Executors.directExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
- assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config)._1());
+ try {
+ assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._1());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
+ highAvailabilityServices.closeAndCleanupAllData();
+
try {
server.close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index d904004..130610c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -23,17 +23,20 @@ import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.Some;
import scala.Tuple2;
@@ -56,7 +59,7 @@ import static org.junit.Assert.fail;
/**
* Tests that the TaskManager process properly exits when the TaskManager actor dies.
*/
-public abstract class TaskManagerProcessReapingTestBase {
+public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
/**
* Called after the task manager has been started up. After calling this
@@ -72,12 +75,25 @@ public abstract class TaskManagerProcessReapingTestBase {
}
@Test
- public void testReapProcessOnFailure() {
+ public void testReapProcessOnFailure() throws Exception {
Process taskManagerProcess = null;
ActorSystem jmActorSystem = null;
final StringWriter processOutput = new StringWriter();
+ final Configuration config = new Configuration();
+
+ final String jobManagerHostname = "localhost";
+ final int jobManagerPort = NetUtils.getAvailablePort();
+
+ config.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
+ config.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
try {
String javaCommand = getJavaCommandPath();
@@ -93,29 +109,23 @@ public abstract class TaskManagerProcessReapingTestBase {
tempLogFile.deleteOnExit();
CommonTestUtils.printLog4jDebugConfig(tempLogFile);
- final int jobManagerPort = NetUtils.getAvailablePort();
-
// start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
- jmActorSystem = AkkaUtils.createActorSystem(
- new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
+ Tuple2<String, Object> localAddress = new Tuple2<String, Object>(jobManagerHostname, jobManagerPort);
+ jmActorSystem = AkkaUtils.createActorSystem(config, new Some<>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(
- new Configuration(),
+ config,
jmActorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1;
- // start a ResourceManager
- StandaloneLeaderRetrievalService standaloneLeaderRetrievalService =
- new StandaloneLeaderRetrievalService(AkkaUtils.getAkkaURL(jmActorSystem, jmActor));
-
FlinkResourceManager.startResourceManagerActors(
new Configuration(),
jmActorSystem,
- standaloneLeaderRetrievalService,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
StandaloneResourceManager.class);
final int taskManagerPort = NetUtils.getAvailablePort();
@@ -205,6 +215,9 @@ public abstract class TaskManagerProcessReapingTestBase {
if (jmActorSystem != null) {
jmActorSystem.shutdown();
}
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
+ }
}
}
@@ -222,18 +235,28 @@ public abstract class TaskManagerProcessReapingTestBase {
public static class TaskManagerTestEntryPoint {
- public static void main(String[] args) {
- try {
- int jobManagerPort = Integer.parseInt(args[0]);
- int taskManagerPort = Integer.parseInt(args[1]);
+ public static void main(String[] args) throws Exception {
+ int jobManagerPort = Integer.parseInt(args[0]);
+ int taskManagerPort = Integer.parseInt(args[1]);
- Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
- cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+ cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
- TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ cfg,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
+ try {
+ TaskManager.runTaskManager(
+ "localhost",
+ ResourceID.generate(),
+ taskManagerPort,
+ cfg,
+ highAvailabilityServices);
// wait forever
Object lock = new Object();
@@ -243,6 +266,8 @@ public abstract class TaskManagerProcessReapingTestBase {
}
catch (Throwable t) {
System.exit(1);
+ } finally {
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
}