You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/15 17:08:40 UTC
[4/4] flink git commit: [FLINK-1705] [taskmanager] Fix hostname
lookup.
[FLINK-1705] [taskmanager] Fix hostname lookup.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5308ac83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5308ac83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5308ac83
Branch: refs/heads/master
Commit: 5308ac8325a5b31627023bfd002a9a3757d15c1f
Parents: 2eea012
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Mar 14 18:20:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 15 16:37:11 2015 +0100
----------------------------------------------------------------------
.../instance/InstanceConnectionInfo.java | 75 +++++++++-------
.../instance/InstanceConnectionInfoTest.java | 95 ++++++++++++++++----
2 files changed, 122 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index a1eec4d..ee79c23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -31,7 +31,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class encapsulates all connection information necessary to connect to the instance's task manager.
+ * This class encapsulates the connection information of a TaskManager.
+ * It describes the host where the TaskManager operates and its server port
+ * for data exchange. This class also contains utilities to work with the
+ * TaskManager's host name, which is used to localize work assignments.
*/
public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, java.io.Serializable {
@@ -56,15 +59,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
private String fqdnHostName;
/**
- * The hostname
+ * The hostname, derived from the fully qualified host name.
*/
private String hostName;
-
- /**
- * This flag indicates if the FQDN hostname cound not be resolved and is represented
- * as an IP address (string).
- */
- private boolean fqdnHostNameIsIP = false;
/**
@@ -90,14 +87,24 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
// get FQDN hostname on this TaskManager.
try {
this.fqdnHostName = this.inetAddress.getCanonicalHostName();
- } catch (Throwable t) {
- LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded since HDFS input split assignment is not possible");
- if(LOG.isDebugEnabled()) {
- LOG.debug("getCanonicalHostName() Exception", t);
- }
- // could not determine host name, so take IP textual representation
- this.fqdnHostName = inetAddress.getHostAddress();
- this.fqdnHostNameIsIP = true;
+ }
+ catch (Throwable t) {
+ LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
+ "for HDFS files) may be non-local when the canonical hostname is missing.");
+ LOG.debug("getCanonicalHostName() Exception:", t);
+ this.fqdnHostName = this.inetAddress.getHostAddress();
+ }
+
+ if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
+ // this happens when the name lookup fails, either due to an exception,
+ // or because no hostname can be found for the address
+ // take IP textual representation
+ this.hostName = this.fqdnHostName;
+ LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+ + "Local input split assignment (such as for HDFS files) may be impacted.");
+ }
+ else {
+ this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
}
}
@@ -126,27 +133,37 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
}
/**
- * Returns the host name of the instance. If the host name could not be determined, the return value will be a
- * textual representation of the instance's IP address.
+ * Returns the fully-qualified domain name the TaskManager. If the name could not be
+ * determined, the return value will be a textual representation of the TaskManager's IP address.
*
- * @return the host name of the instance
+ * @return The fully-qualified domain name of the TaskManager.
*/
public String getFQDNHostname() {
return this.fqdnHostName;
}
-
+
+ /**
+ * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
+ * domain name (FQDN, see {@link #getFQDNHostname()}):
+ * <ul>
+ * <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
+ * <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
+ * used as the hostname.</li>
+ * <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
+ * segment (here "worker3") will be used as the hostname.</li>
+ * </ul>
+ *
+ * @return The hostname of the TaskManager.
+ */
public String getHostname() {
- if(hostName == null) {
- String fqdn = getFQDNHostname();
- if(this.fqdnHostNameIsIP) { // fqdn to hostname translation is pointless if FQDN is an ip address.
- hostName = fqdn;
- } else {
- hostName = NetUtils.getHostnameFromFQDN(fqdn);
- }
- }
return hostName;
}
+ /**
+ * Gets the IP address where the TaskManager operates.
+ *
+ * @return The IP address.
+ */
public String getInetAdress() {
return this.inetAddress.toString();
}
@@ -166,7 +183,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
this.fqdnHostName = StringUtils.readNullableString(in);
this.hostName = StringUtils.readNullableString(in);
- this.fqdnHostNameIsIP = in.readBoolean();
try {
this.inetAddress = InetAddress.getByAddress(address);
@@ -185,7 +201,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
StringUtils.writeNullableString(fqdnHostName, out);
StringUtils.writeNullableString(hostName, out);
- out.writeBoolean(fqdnHostNameIsIP);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index c072e59..2769183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -27,19 +28,43 @@ import java.net.InetAddress;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.reflect.Whitebox;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the InstanceConnectionInfo, which identifies the location and connection
+ * information of a TaskManager.
+ */
public class InstanceConnectionInfoTest {
@Test
public void testEqualsHashAndCompareTo() {
try {
+ // we mock the addresses to save the times of the reverse name lookups
+ InetAddress address1 = mock(InetAddress.class);
+ when(address1.getCanonicalHostName()).thenReturn("localhost");
+ when(address1.getHostName()).thenReturn("localhost");
+ when(address1.getHostAddress()).thenReturn("127.0.0.1");
+ when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
+
+ InetAddress address2 = mock(InetAddress.class);
+ when(address2.getCanonicalHostName()).thenReturn("testhost1");
+ when(address2.getHostName()).thenReturn("testhost1");
+ when(address2.getHostAddress()).thenReturn("0.0.0.0");
+ when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
+
+ InetAddress address3 = mock(InetAddress.class);
+ when(address3.getCanonicalHostName()).thenReturn("testhost2");
+ when(address3.getHostName()).thenReturn("testhost2");
+ when(address3.getHostAddress()).thenReturn("192.168.0.1");
+ when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
+
// one == four != two != three
- InstanceConnectionInfo one = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
- InstanceConnectionInfo two = new InstanceConnectionInfo(InetAddress.getByName("0.0.0.0"), 19871);
- InstanceConnectionInfo three = new InstanceConnectionInfo(InetAddress.getByName("192.168.0.1"), 10871);
- InstanceConnectionInfo four = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
+ InstanceConnectionInfo one = new InstanceConnectionInfo(address1, 19871);
+ InstanceConnectionInfo two = new InstanceConnectionInfo(address2, 19871);
+ InstanceConnectionInfo three = new InstanceConnectionInfo(address3, 10871);
+ InstanceConnectionInfo four = new InstanceConnectionInfo(address1, 19871);
assertTrue(one.equals(four));
assertTrue(!one.equals(two));
@@ -101,10 +126,10 @@ public class InstanceConnectionInfoTest {
public void testGetFQDNHostname() {
try {
InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
- assertTrue(info1.getFQDNHostname() != null);
+ assertNotNull(info1.getFQDNHostname());
InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
- assertTrue(info2.getFQDNHostname() != null);
+ assertNotNull(info2.getFQDNHostname());
}
catch (Exception e) {
e.printStackTrace();
@@ -115,10 +140,15 @@ public class InstanceConnectionInfoTest {
@Test
public void testGetHostname0() {
try {
- final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871));
- Whitebox.setInternalState(info1, "fqdnHostName", "worker2.cluster.mycompany.com");
- Assert.assertEquals("worker2", info1.getHostname());
- } catch (Exception e) {
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
+ when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
+ when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+ final InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
+ Assert.assertEquals("worker2", info.getHostname());
+ }
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -127,14 +157,43 @@ public class InstanceConnectionInfoTest {
@Test
public void testGetHostname1() {
try {
- final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871));
- Whitebox.setInternalState(info1, "fqdnHostName", "worker10");
- Assert.assertEquals("worker10", info1.getHostname());
- } catch (Exception e) {
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("worker10");
+ when(address.getHostName()).thenReturn("worker10");
+ when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+ InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
+ Assert.assertEquals("worker10", info.getHostname());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetHostname2() {
+ try {
+ final String addressString = "192.168.254.254";
+
+ // we mock the addresses to save the times of the reverse name lookups
+ InetAddress address = mock(InetAddress.class);
+ when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
+ when(address.getHostName()).thenReturn("192.168.254.254");
+ when(address.getHostAddress()).thenReturn("192.168.254.254");
+ when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
+
+ InstanceConnectionInfo info = new InstanceConnectionInfo(address, 54152);
+
+ assertNotNull(info.getFQDNHostname());
+ assertTrue(info.getFQDNHostname().equals(addressString));
+
+ assertNotNull(info.getHostname());
+ assertTrue(info.getHostname().equals(addressString));
+ }
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
-
-
}