You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/02 09:29:47 UTC

[09/16] flink git commit: [hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest.

[hotfix] [tests] Rename UtilsTest to YarnFlinkResourceManagerTest.

Test was misnamed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8425e5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8425e5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8425e5b

Branch: refs/heads/release-1.5
Commit: a8425e5ba6d36f6a8408e47da7f71cf7a0c6eb73
Parents: 4409ba2
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 11:18:32 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:15 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 244 -------------------
 .../yarn/YarnFlinkResourceManagerTest.java      | 244 +++++++++++++++++++
 2 files changed, 244 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index b7a38b0..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link Utils}.
- */
-public class UtilsTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	@Test
-	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
-		new JavaTestKit(system) {{
-
-			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
-
-			Configuration flinkConfig = new Configuration();
-			YarnConfiguration yarnConfig = new YarnConfiguration();
-			SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(
-				null,
-				null);
-			String applicationMasterHostName = "localhost";
-			String webInterfaceURL = "foobar";
-			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
-				1L, 1L, 1L, 1, new HashMap<String, String>());
-			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
-			int yarnHeartbeatIntervalMillis = 1000;
-			int maxFailedContainers = 10;
-			int numInitialTaskManagers = 5;
-			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
-			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
-			NMClient nodeManagerClient = mock(NMClient.class);
-			UUID leaderSessionID = UUID.randomUUID();
-
-			final List<Container> containerList = new ArrayList<>();
-
-			for (int i = 0; i < numInitialTaskManagers; i++) {
-				Container mockContainer = mock(Container.class);
-				when(mockContainer.getId()).thenReturn(
-					ContainerId.newInstance(
-						ApplicationAttemptId.newInstance(
-							ApplicationId.newInstance(System.currentTimeMillis(), 1),
-							1),
-						i));
-				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-				containerList.add(mockContainer);
-			}
-
-			doAnswer(new Answer() {
-				int counter = 0;
-				@Override
-				public Object answer(InvocationOnMock invocation) throws Throwable {
-					if (counter < containerList.size()) {
-						callbackHandler.onContainersAllocated(
-							Collections.singletonList(
-								containerList.get(counter++)
-							));
-					}
-					return null;
-				}
-			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
-			final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>();
-			final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>();
-
-			doAnswer(
-				(InvocationOnMock invocation) -> {
-					Container container = (Container) invocation.getArguments()[0];
-					resourceManagerFuture.thenCombine(leaderGatewayFuture,
-						(resourceManagerGateway, leaderGateway) -> {
-						resourceManagerGateway.tell(
-							new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-							leaderGateway);
-						return null;
-						});
-					return null;
-				})
-				.when(nodeManagerClient)
-				.startContainer(
-					Matchers.any(Container.class),
-					Matchers.any(ContainerLaunchContext.class));
-
-			ActorRef resourceManager = null;
-			ActorRef leader1;
-
-			try {
-				leader1 = system.actorOf(
-					Props.create(
-						TestingUtils.ForwardingActor.class,
-						getRef(),
-						Option.apply(leaderSessionID)
-					));
-
-				resourceManager = system.actorOf(
-					Props.create(
-						TestingYarnFlinkResourceManager.class,
-						flinkConfig,
-						yarnConfig,
-						leaderRetrievalService,
-						applicationMasterHostName,
-						webInterfaceURL,
-						taskManagerParameters,
-						taskManagerLaunchContext,
-						yarnHeartbeatIntervalMillis,
-						maxFailedContainers,
-						numInitialTaskManagers,
-						callbackHandler,
-						resourceManagerClient,
-						nodeManagerClient
-					));
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
-				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
-
-				leaderGatewayFuture.complete(leader1Gateway);
-				resourceManagerFuture.complete(resourceManagerGateway);
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
-				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
-
-				leaderRetrievalService.notifyListener(null, null);
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
-
-				for (Container container: containerList) {
-					resourceManagerGateway.tell(
-						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-						leader1Gateway);
-				}
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft());
-
-				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
-				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
-			} finally {
-				if (resourceManager != null) {
-					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-				}
-			}
-		}};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a8425e5b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
new file mode 100644
index 0000000..10b2ce9
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link YarnFlinkResourceManager}.
+ */
+public class YarnFlinkResourceManagerTest extends TestLogger {
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	@Test
+	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
+		new JavaTestKit(system) {{
+
+			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+			Configuration flinkConfig = new Configuration();
+			YarnConfiguration yarnConfig = new YarnConfiguration();
+			SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(
+				null,
+				null);
+			String applicationMasterHostName = "localhost";
+			String webInterfaceURL = "foobar";
+			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
+				1L, 1L, 1L, 1, new HashMap<String, String>());
+			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
+			int yarnHeartbeatIntervalMillis = 1000;
+			int maxFailedContainers = 10;
+			int numInitialTaskManagers = 5;
+			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
+			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
+			NMClient nodeManagerClient = mock(NMClient.class);
+			UUID leaderSessionID = UUID.randomUUID();
+
+			final List<Container> containerList = new ArrayList<>();
+
+			for (int i = 0; i < numInitialTaskManagers; i++) {
+				Container mockContainer = mock(Container.class);
+				when(mockContainer.getId()).thenReturn(
+					ContainerId.newInstance(
+						ApplicationAttemptId.newInstance(
+							ApplicationId.newInstance(System.currentTimeMillis(), 1),
+							1),
+						i));
+				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
+				containerList.add(mockContainer);
+			}
+
+			doAnswer(new Answer() {
+				int counter = 0;
+				@Override
+				public Object answer(InvocationOnMock invocation) throws Throwable {
+					if (counter < containerList.size()) {
+						callbackHandler.onContainersAllocated(
+							Collections.singletonList(
+								containerList.get(counter++)
+							));
+					}
+					return null;
+				}
+			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+			final CompletableFuture<AkkaActorGateway> resourceManagerFuture = new CompletableFuture<>();
+			final CompletableFuture<AkkaActorGateway> leaderGatewayFuture = new CompletableFuture<>();
+
+			doAnswer(
+				(InvocationOnMock invocation) -> {
+					Container container = (Container) invocation.getArguments()[0];
+					resourceManagerFuture.thenCombine(leaderGatewayFuture,
+						(resourceManagerGateway, leaderGateway) -> {
+						resourceManagerGateway.tell(
+							new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+							leaderGateway);
+						return null;
+						});
+					return null;
+				})
+				.when(nodeManagerClient)
+				.startContainer(
+					Matchers.any(Container.class),
+					Matchers.any(ContainerLaunchContext.class));
+
+			ActorRef resourceManager = null;
+			ActorRef leader1;
+
+			try {
+				leader1 = system.actorOf(
+					Props.create(
+						TestingUtils.ForwardingActor.class,
+						getRef(),
+						Option.apply(leaderSessionID)
+					));
+
+				resourceManager = system.actorOf(
+					Props.create(
+						TestingYarnFlinkResourceManager.class,
+						flinkConfig,
+						yarnConfig,
+						leaderRetrievalService,
+						applicationMasterHostName,
+						webInterfaceURL,
+						taskManagerParameters,
+						taskManagerLaunchContext,
+						yarnHeartbeatIntervalMillis,
+						maxFailedContainers,
+						numInitialTaskManagers,
+						callbackHandler,
+						resourceManagerClient,
+						nodeManagerClient
+					));
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
+				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
+
+				leaderGatewayFuture.complete(leader1Gateway);
+				resourceManagerFuture.complete(resourceManagerGateway);
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
+
+				leaderRetrievalService.notifyListener(null, null);
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
+
+				for (Container container: containerList) {
+					resourceManagerGateway.tell(
+						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+						leader1Gateway);
+				}
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.INSTANCE, deadline.timeLeft());
+
+				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
+			} finally {
+				if (resourceManager != null) {
+					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				}
+			}
+		}};
+	}
+}