You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/15 17:08:40 UTC

[4/4] flink git commit: [FLINK-1705] [taskmanager] Fix hostname lookup.

[FLINK-1705] [taskmanager] Fix hostname lookup.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5308ac83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5308ac83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5308ac83

Branch: refs/heads/master
Commit: 5308ac8325a5b31627023bfd002a9a3757d15c1f
Parents: 2eea012
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Mar 14 18:20:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 15 16:37:11 2015 +0100

----------------------------------------------------------------------
 .../instance/InstanceConnectionInfo.java        | 75 +++++++++-------
 .../instance/InstanceConnectionInfoTest.java    | 95 ++++++++++++++++----
 2 files changed, 122 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index a1eec4d..ee79c23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -31,7 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class encapsulates all connection information necessary to connect to the instance's task manager.
+ * This class encapsulates the connection information of a TaskManager.
+ * It describes the host where the TaskManager operates and its server port
+ * for data exchange. This class also contains utilities to work with the
+ * TaskManager's host name, which is used to localize work assignments.
  */
 public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, java.io.Serializable {
 
@@ -56,15 +59,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	private String fqdnHostName;
 	
 	/**
-	 * The hostname
+	 * The hostname, derived from the fully qualified host name.
 	 */
 	private String hostName;
-	
-	/**
-	 * This flag indicates if the FQDN hostname cound not be resolved and is represented
-	 * as an IP address (string).
-	 */
-	private boolean fqdnHostNameIsIP = false;
 
 
 	/**
@@ -90,14 +87,24 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		// get FQDN hostname on this TaskManager.
 		try {
 			this.fqdnHostName = this.inetAddress.getCanonicalHostName();
-		} catch (Throwable t) {
-			LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded since HDFS input split assignment is not possible");
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("getCanonicalHostName() Exception", t);
-			}
-			// could not determine host name, so take IP textual representation
-			this.fqdnHostName = inetAddress.getHostAddress();
-			this.fqdnHostNameIsIP = true;
+		}
+		catch (Throwable t) {
+			LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
+					"for HDFS files) may be non-local when the canonical hostname is missing.");
+			LOG.debug("getCanonicalHostName() Exception:", t);
+			this.fqdnHostName = this.inetAddress.getHostAddress();
+		}
+
+		if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
+			// this happens when the name lookup fails, either due to an exception,
+			// or because no hostname can be found for the address
+			// take IP textual representation
+			this.hostName = this.fqdnHostName;
+			LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+					+ "Local input split assignment (such as for HDFS files) may be impacted.");
+		}
+		else {
+			this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
 		}
 	}
 
@@ -126,27 +133,37 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	}
 
 	/**
-	 * Returns the host name of the instance. If the host name could not be determined, the return value will be a
-	 * textual representation of the instance's IP address.
+	 * Returns the fully-qualified domain name the TaskManager. If the name could not be
+	 * determined, the return value will be a textual representation of the TaskManager's IP address.
 	 * 
-	 * @return the host name of the instance
+	 * @return The fully-qualified domain name of the TaskManager.
 	 */
 	public String getFQDNHostname() {
 		return this.fqdnHostName;
 	}
-	
+
+	/**
+	 * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
+	 * domain name (FQDN, see {@link #getFQDNHostname()}):
+	 * <ul>
+	 *     <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
+	 *     <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
+	 *         used as the hostname.</li>
+	 *     <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
+	 *         segment (here "worker3") will be used as the hostname.</li>
+	 * </ul>
+	 *
+	 * @return The hostname of the TaskManager.
+	 */
 	public String getHostname() {
-		if(hostName == null) {
-			String fqdn = getFQDNHostname();
-			if(this.fqdnHostNameIsIP) { // fqdn to hostname translation is pointless if FQDN is an ip address.
-				hostName = fqdn;
-			} else {
-				hostName = NetUtils.getHostnameFromFQDN(fqdn);
-			}
-		}
 		return hostName;
 	}
 
+	/**
+	 * Gets the IP address where the TaskManager operates.
+	 *
+	 * @return The IP address.
+	 */
 	public String getInetAdress() {
 		return this.inetAddress.toString();
 	}
@@ -166,7 +183,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		
 		this.fqdnHostName = StringUtils.readNullableString(in);
 		this.hostName = StringUtils.readNullableString(in);
-		this.fqdnHostNameIsIP = in.readBoolean();
 
 		try {
 			this.inetAddress = InetAddress.getByAddress(address);
@@ -185,7 +201,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		
 		StringUtils.writeNullableString(fqdnHostName, out);
 		StringUtils.writeNullableString(hostName, out);
-		out.writeBoolean(fqdnHostNameIsIP);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index c072e59..2769183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -27,19 +28,43 @@ import java.net.InetAddress;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.reflect.Whitebox;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the InstanceConnectionInfo, which identifies the location and connection
+ * information of a TaskManager.
+ */
 public class InstanceConnectionInfoTest {
 
 	@Test
 	public void testEqualsHashAndCompareTo() {
 		try {
+			// we mock the addresses to save the times of the reverse name lookups
+			InetAddress address1 = mock(InetAddress.class);
+			when(address1.getCanonicalHostName()).thenReturn("localhost");
+			when(address1.getHostName()).thenReturn("localhost");
+			when(address1.getHostAddress()).thenReturn("127.0.0.1");
+			when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
+
+			InetAddress address2 = mock(InetAddress.class);
+			when(address2.getCanonicalHostName()).thenReturn("testhost1");
+			when(address2.getHostName()).thenReturn("testhost1");
+			when(address2.getHostAddress()).thenReturn("0.0.0.0");
+			when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
+
+			InetAddress address3 = mock(InetAddress.class);
+			when(address3.getCanonicalHostName()).thenReturn("testhost2");
+			when(address3.getHostName()).thenReturn("testhost2");
+			when(address3.getHostAddress()).thenReturn("192.168.0.1");
+			when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
+
 			// one == four != two != three
-			InstanceConnectionInfo one = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-			InstanceConnectionInfo two = new InstanceConnectionInfo(InetAddress.getByName("0.0.0.0"), 19871);
-			InstanceConnectionInfo three = new InstanceConnectionInfo(InetAddress.getByName("192.168.0.1"), 10871);
-			InstanceConnectionInfo four = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
+			InstanceConnectionInfo one = new InstanceConnectionInfo(address1, 19871);
+			InstanceConnectionInfo two = new InstanceConnectionInfo(address2, 19871);
+			InstanceConnectionInfo three = new InstanceConnectionInfo(address3, 10871);
+			InstanceConnectionInfo four = new InstanceConnectionInfo(address1, 19871);
 			
 			assertTrue(one.equals(four));
 			assertTrue(!one.equals(two));
@@ -101,10 +126,10 @@ public class InstanceConnectionInfoTest {
 	public void testGetFQDNHostname() {
 		try {
 			InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-			assertTrue(info1.getFQDNHostname() != null);
+			assertNotNull(info1.getFQDNHostname());
 			
 			InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
-			assertTrue(info2.getFQDNHostname() != null);
+			assertNotNull(info2.getFQDNHostname());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -115,10 +140,15 @@ public class InstanceConnectionInfoTest {
 	@Test
 	public void testGetHostname0() {
 		try {
-			final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871));
-			Whitebox.setInternalState(info1, "fqdnHostName", "worker2.cluster.mycompany.com");
-			Assert.assertEquals("worker2", info1.getHostname());
-		} catch (Exception e) {
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
+			when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
+			when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+			final InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
+			Assert.assertEquals("worker2", info.getHostname());
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -127,14 +157,43 @@ public class InstanceConnectionInfoTest {
 	@Test
 	public void testGetHostname1() {
 		try {
-			final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871));
-			Whitebox.setInternalState(info1, "fqdnHostName", "worker10");
-			Assert.assertEquals("worker10", info1.getHostname());
-		} catch (Exception e) {
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("worker10");
+			when(address.getHostName()).thenReturn("worker10");
+			when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+			InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
+			Assert.assertEquals("worker10", info.getHostname());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetHostname2() {
+		try {
+			final String addressString = "192.168.254.254";
+
+			// we mock the addresses to save the times of the reverse name lookups
+			InetAddress address = mock(InetAddress.class);
+			when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
+			when(address.getHostName()).thenReturn("192.168.254.254");
+			when(address.getHostAddress()).thenReturn("192.168.254.254");
+			when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
+
+			InstanceConnectionInfo info = new InstanceConnectionInfo(address, 54152);
+
+			assertNotNull(info.getFQDNHostname());
+			assertTrue(info.getFQDNHostname().equals(addressString));
+
+			assertNotNull(info.getHostname());
+			assertTrue(info.getHostname().equals(addressString));
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
-	
 }