You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 12:58:38 UTC

[flink] branch master updated: [FLINK-11369][tests] Remove legacy ZooKeeperHAJobManagerTest

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4facd91  [FLINK-11369][tests] Remove legacy ZooKeeperHAJobManagerTest
4facd91 is described below

commit 4facd91ccd0eaafde6c71641e4e1dd2b4d4b7770
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 31 20:58:30 2019 +0800

    [FLINK-11369][tests] Remove legacy ZooKeeperHAJobManagerTest
---
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 183 ---------------------
 1 file changed, 183 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
deleted file mode 100644
index dc9e8f5..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.ActorUtils;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.dispatcher.DispatcherHATest;
-import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.ExtendedActorSystem;
-import akka.actor.Identify;
-import akka.actor.Terminated;
-import akka.pattern.Patterns;
-import org.apache.curator.framework.CuratorFramework;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
- */
-public class ZooKeeperHAJobManagerTest extends TestLogger {
-
-	@ClassRule
-	public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
-
-	@ClassRule
-	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-
-	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS);
-
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		final Future<Terminated> terminationFuture = system.terminate();
-		Await.ready(terminationFuture, TIMEOUT);
-	}
-
-	/**
-	 * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses
-	 * leadership.
-	 */
-	@Test
-	public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
-		final Configuration configuration = new Configuration();
-		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
-		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
-
-		try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices()) {
-
-			final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
-			final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-			highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
-			highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration));
-			highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
-
-			final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration);
-			final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
-			otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
-
-			ActorRef jobManagerActorRef = null;
-			try {
-				jobManagerActorRef = JobManager.startJobManagerActors(
-					configuration,
-					system,
-					TestingUtils.defaultExecutor(),
-					TestingUtils.defaultExecutor(),
-					highAvailabilityServices,
-					NoOpMetricRegistry.INSTANCE,
-					Option.empty(),
-					TestingJobManager.class,
-					MemoryArchivist.class)._1();
-
-				waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
-
-				final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-				leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
-
-				final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
-
-				final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
-
-				Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT);
-
-				Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds();
-
-				final JobID jobId = nonEmptyJobGraph.getJobID();
-				assertThat(jobIds, contains(jobId));
-
-				// revoke the leadership
-				leaderElectionService.notLeader();
-
-				Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
-
-				//noinspection RedundantCast
-				final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue(
-						((ExtendedActorSystem) system),
-						// we need the explicit cast to disambiguate the function call
-						(Callable<SubmittedJobGraph>) () -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
-
-				assertThat(recoveredJobGraph, is(notNullValue()));
-
-				otherSubmittedJobGraphStore.removeJobGraph(jobId);
-
-				jobIds = otherSubmittedJobGraphStore.getJobIds();
-
-				assertThat(jobIds, not(contains(jobId)));
-			} finally {
-				client.close();
-				otherClient.close();
-
-				if (jobManagerActorRef != null) {
-					ActorUtils.stopActor(jobManagerActorRef);
-				}
-			}
-		}
-	}
-
-	private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException {
-		Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout);
-	}
-}