You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:11 UTC

[7/8] flink git commit: [FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
new file mode 100644
index 0000000..9452b20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the TaskManagerLocation, which identifies the location and connection
+ * information of a TaskManager.
+ */
+public class TaskManagerLocationTest {
+
+	@Test
+	public void testEqualsHashAndCompareTo() {
+		try {
+			ResourceID resourceID1 = new ResourceID("a");
+			ResourceID resourceID2 = new ResourceID("b");
+			ResourceID resourceID3 = new ResourceID("c");
+
+			// we mock the addresses to save the times of the reverse name lookups
+			InetAddress address1 = mock(InetAddress.class);
+			when(address1.getCanonicalHostName()).thenReturn("localhost");
+			when(address1.getHostName()).thenReturn("localhost");
+			when(address1.getHostAddress()).thenReturn("127.0.0.1");
+			when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
+
+			InetAddress address2 = mock(InetAddress.class);
+			when(address2.getCanonicalHostName()).thenReturn("testhost1");
+			when(address2.getHostName()).thenReturn("testhost1");
+			when(address2.getHostAddress()).thenReturn("0.0.0.0");
+			when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
+
+			InetAddress address3 = mock(InetAddress.class);
+			when(address3.getCanonicalHostName()).thenReturn("testhost2");
+			when(address3.getHostName()).thenReturn("testhost2");
+			when(address3.getHostAddress()).thenReturn("192.168.0.1");
+			when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
+
+			// one == four != two != three
+			TaskManagerLocation one = new TaskManagerLocation(resourceID1, address1, 19871);
+			TaskManagerLocation two = new TaskManagerLocation(resourceID2, address2, 19871);
+			TaskManagerLocation three = new TaskManagerLocation(resourceID3, address3, 10871);
+			TaskManagerLocation four = new TaskManagerLocation(resourceID1, address1, 19871);
+			
+			assertTrue(one.equals(four));
+			assertTrue(!one.equals(two));
+			assertTrue(!one.equals(three));
+			assertTrue(!two.equals(three));
+			assertTrue(!three.equals(four));
+			
+			assertTrue(one.compareTo(four) == 0);
+			assertTrue(four.compareTo(one) == 0);
+			assertTrue(one.compareTo(two) != 0);
+			assertTrue(one.compareTo(three) != 0);
+			assertTrue(two.compareTo(three) != 0);
+			assertTrue(three.compareTo(four) != 0);
+			
+			{
+				int val = one.compareTo(two);
+				assertTrue(two.compareTo(one) == -val);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialization() {
+		try {
+			// without resolved hostname
+			{
+				TaskManagerLocation original = new TaskManagerLocation(
+						ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
+
+				TaskManagerLocation serCopy = InstantiationUtil.clone(original);
+				assertEquals(original, serCopy);
+			}
+						
+			// with resolved hostname
+			{
+				TaskManagerLocation original = new TaskManagerLocation(
+						ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871);
+				original.getFQDNHostname();
+
+				TaskManagerLocation serCopy = InstantiationUtil.clone(original);
+				assertEquals(original, serCopy);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGetFQDNHostname() {
+		try {
+			TaskManagerLocation info1 = new TaskManagerLocation(ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871);
+			assertNotNull(info1.getFQDNHostname());
+			
+			TaskManagerLocation info2 = new TaskManagerLocation(ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888);
+			assertNotNull(info2.getFQDNHostname());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGetHostname0() {
+		try {
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
+			when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
+			when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+			final TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871);
+			Assert.assertEquals("worker2", info.getHostname());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGetHostname1() {
+		try {
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("worker10");
+			when(address.getHostName()).thenReturn("worker10");
+			when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+			TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871);
+			Assert.assertEquals("worker10", info.getHostname());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetHostname2() {
+		try {
+			final String addressString = "192.168.254.254";
+
+			// we mock the addresses to save the times of the reverse name lookups
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
+			when(address.getHostName()).thenReturn("192.168.254.254");
+			when(address.getHostAddress()).thenReturn("192.168.254.254");
+			when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
+
+			TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 54152);
+
+			assertNotNull(info.getFQDNHostname());
+			assertTrue(info.getFQDNHostname().equals(addressString));
+
+			assertNotNull(info.getHostname());
+			assertTrue(info.getHostname().equals(addressString));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7feb949..f9c9b63 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance._
 import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
@@ -63,8 +64,11 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
       val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
 
-      val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
-      val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+      val resourceId1 = ResourceID.generate()
+      val resourceId2 = ResourceID.generate()
+      
+      val connectionInfo1 = new TaskManagerLocation(resourceId1, InetAddress.getLocalHost, 10000)
+      val connectionInfo2 = new TaskManagerLocation(resourceId2, InetAddress.getLocalHost, 10001)
 
       val hardwareDescription = HardwareDescription.extractFromSystem(10)
 
@@ -75,7 +79,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       within(10 seconds) {
         jm.tell(
           RegisterTaskManager(
-            ResourceID.generate(),
+            resourceId1,
             connectionInfo1,
             hardwareDescription,
             1),
@@ -92,7 +96,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       within(10 seconds) {
         jm.tell(
           RegisterTaskManager(
-            ResourceID.generate(),
+            resourceId2,
             connectionInfo2,
             hardwareDescription,
             1),
@@ -118,7 +122,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       val selfGateway = new AkkaActorGateway(testActor, null)
 
       val resourceID = ResourceID.generate()
-      val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
+      val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1)
       val hardwareDescription = HardwareDescription.extractFromSystem(10)
 
       within(20 seconds) {

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 73ab7eb..f9f294b 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManagerConfiguration}
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
 
 /** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
@@ -43,14 +42,14 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   *                              JobManager
   */
 class TestingYarnTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
+                              config: TaskManagerConfiguration,
+                              resourceID: ResourceID,
+                              connectionInfo: TaskManagerLocation,
+                              memoryManager: MemoryManager,
+                              ioManager: IOManager,
+                              network: NetworkEnvironment,
+                              numberOfSlots: Int,
+                              leaderRetrievalService: LeaderRetrievalService)
   extends YarnTaskManager(
     config,
     resourceID,
@@ -65,6 +64,7 @@ class TestingYarnTaskManager(
   object YarnTaskManager {
 
     /** Entry point (main method) to run the TaskManager on YARN.
+ *
       * @param args The command line arguments.
       */
     def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 107801d..0c6264e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.
@@ -32,7 +31,7 @@ import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfigurati
 class YarnTaskManager(
     config: TaskManagerConfiguration,
     resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
+    taskManagerLocation: TaskManagerLocation,
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
@@ -41,7 +40,7 @@ class YarnTaskManager(
   extends TaskManager(
     config,
     resourceID,
-    connectionInfo,
+    taskManagerLocation,
     memoryManager,
     ioManager,
     network,