You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:11 UTC
[7/8] flink git commit: [FLINK-4490] [distributed coordination] (part
1) Change InstanceConnectionInfo to TaskManagerLocation
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
new file mode 100644
index 0000000..9452b20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the TaskManagerLocation, which identifies the location and connection
+ * information of a TaskManager.
+ */
+public class TaskManagerLocationTest {
+
+ @Test
+ public void testEqualsHashAndCompareTo() {
+ try {
+ ResourceID resourceID1 = new ResourceID("a");
+ ResourceID resourceID2 = new ResourceID("b");
+ ResourceID resourceID3 = new ResourceID("c");
+
+ // we mock the addresses to save the times of the reverse name lookups
+ InetAddress address1 = mock(InetAddress.class);
+ when(address1.getCanonicalHostName()).thenReturn("localhost");
+ when(address1.getHostName()).thenReturn("localhost");
+ when(address1.getHostAddress()).thenReturn("127.0.0.1");
+ when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
+
+ InetAddress address2 = mock(InetAddress.class);
+ when(address2.getCanonicalHostName()).thenReturn("testhost1");
+ when(address2.getHostName()).thenReturn("testhost1");
+ when(address2.getHostAddress()).thenReturn("0.0.0.0");
+ when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
+
+ InetAddress address3 = mock(InetAddress.class);
+ when(address3.getCanonicalHostName()).thenReturn("testhost2");
+ when(address3.getHostName()).thenReturn("testhost2");
+ when(address3.getHostAddress()).thenReturn("192.168.0.1");
+ when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
+
+ // one == four != two != three
+ TaskManagerLocation one = new TaskManagerLocation(resourceID1, address1, 19871);
+ TaskManagerLocation two = new TaskManagerLocation(resourceID2, address2, 19871);
+ TaskManagerLocation three = new TaskManagerLocation(resourceID3, address3, 10871);
+ TaskManagerLocation four = new TaskManagerLocation(resourceID1, address1, 19871);
+
+ assertTrue(one.equals(four));
+ assertTrue(!one.equals(two));
+ assertTrue(!one.equals(three));
+ assertTrue(!two.equals(three));
+ assertTrue(!three.equals(four));
+
+ assertTrue(one.compareTo(four) == 0);
+ assertTrue(four.compareTo(one) == 0);
+ assertTrue(one.compareTo(two) != 0);
+ assertTrue(one.compareTo(three) != 0);
+ assertTrue(two.compareTo(three) != 0);
+ assertTrue(three.compareTo(four) != 0);
+
+ {
+ int val = one.compareTo(two);
+ assertTrue(two.compareTo(one) == -val);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialization() {
+ try {
+ // without resolved hostname
+ {
+ TaskManagerLocation original = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
+
+ TaskManagerLocation serCopy = InstantiationUtil.clone(original);
+ assertEquals(original, serCopy);
+ }
+
+ // with resolved hostname
+ {
+ TaskManagerLocation original = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871);
+ original.getFQDNHostname();
+
+ TaskManagerLocation serCopy = InstantiationUtil.clone(original);
+ assertEquals(original, serCopy);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetFQDNHostname() {
+ try {
+ TaskManagerLocation info1 = new TaskManagerLocation(ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871);
+ assertNotNull(info1.getFQDNHostname());
+
+ TaskManagerLocation info2 = new TaskManagerLocation(ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
+ assertNotNull(info2.getFQDNHostname());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetHostname0() {
+ try {
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
+ when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
+ when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+ final TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871);
+ Assert.assertEquals("worker2", info.getHostname());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetHostname1() {
+ try {
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("worker10");
+ when(address.getHostName()).thenReturn("worker10");
+ when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+ TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871);
+ Assert.assertEquals("worker10", info.getHostname());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetHostname2() {
+ try {
+ final String addressString = "192.168.254.254";
+
+ // we mock the addresses to save the times of the reverse name lookups
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
+ when(address.getHostName()).thenReturn("192.168.254.254");
+ when(address.getHostAddress()).thenReturn("192.168.254.254");
+ when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
+
+ TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 54152);
+
+ assertNotNull(info.getFQDNHostname());
+ assertTrue(info.getFQDNHostname().equals(addressString));
+
+ assertNotNull(info.getHostname());
+ assertTrue(info.getHostname().equals(addressString));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7feb949..f9c9b63 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance._
import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.util.LeaderRetrievalUtils
@@ -63,8 +64,11 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
- val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
- val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+ val resourceId1 = ResourceID.generate()
+ val resourceId2 = ResourceID.generate()
+
+ val connectionInfo1 = new TaskManagerLocation(resourceId1, InetAddress.getLocalHost, 10000)
+ val connectionInfo2 = new TaskManagerLocation(resourceId2, InetAddress.getLocalHost, 10001)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
@@ -75,7 +79,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
within(10 seconds) {
jm.tell(
RegisterTaskManager(
- ResourceID.generate(),
+ resourceId1,
connectionInfo1,
hardwareDescription,
1),
@@ -92,7 +96,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
within(10 seconds) {
jm.tell(
RegisterTaskManager(
- ResourceID.generate(),
+ resourceId2,
connectionInfo2,
hardwareDescription,
1),
@@ -118,7 +122,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
val selfGateway = new AkkaActorGateway(testActor, null)
val resourceID = ResourceID.generate()
- val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+ val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
within(20 seconds) {
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 73ab7eb..f9f294b 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,12 +19,11 @@
package org.apache.flink.yarn
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManagerConfiguration}
import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
@@ -43,14 +42,14 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
* JobManager
*/
class TestingYarnTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: InstanceConnectionInfo,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService)
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService)
extends YarnTaskManager(
config,
resourceID,
@@ -65,6 +64,7 @@ class TestingYarnTaskManager(
object YarnTaskManager {
/** Entry point (main method) to run the TaskManager on YARN.
+ *
* @param args The command line arguments.
*/
def main(args: Array[String]): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 107801d..0c6264e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,12 +19,11 @@
package org.apache.flink.yarn
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
/** An extension of the TaskManager that listens for additional YARN related
* messages.
@@ -32,7 +31,7 @@ import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfigurati
class YarnTaskManager(
config: TaskManagerConfiguration,
resourceID: ResourceID,
- connectionInfo: InstanceConnectionInfo,
+ taskManagerLocation: TaskManagerLocation,
memoryManager: MemoryManager,
ioManager: IOManager,
network: NetworkEnvironment,
@@ -41,7 +40,7 @@ class YarnTaskManager(
extends TaskManager(
config,
resourceID,
- connectionInfo,
+ taskManagerLocation,
memoryManager,
ioManager,
network,