You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/10/08 02:39:39 UTC

[1/2] incubator-apex-core git commit: APEX-181 expose getting of rm web app address

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 e5a01842b -> 423402420


APEX-181 expose getting of rm web app address


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/12af6140
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/12af6140
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/12af6140

Branch: refs/heads/devel-3
Commit: 12af6140fe89c688b9add8dfe07f27fcd4c8ffa3
Parents: b986f70
Author: David Yan <da...@datatorrent.com>
Authored: Mon Oct 5 13:08:03 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Wed Oct 7 16:48:28 2015 -0700

----------------------------------------------------------------------
 .../stram/client/StramClientUtils.java          | 45 +++++++++++
 .../security/StramWSFilterInitializer.java      | 50 ++----------
 .../stram/client/StramClientUtilsTest.java      | 81 ++++++++++++++++++++
 3 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d679da4..04a3484 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -34,9 +34,11 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -732,4 +734,47 @@ public class StramClientUtils
     return null;
   }
 
+  public static InetSocketAddress getRMWebAddress(Configuration conf, String rmId)
+  {
+    boolean sslEnabled = conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
+    return getRMWebAddress(conf, sslEnabled, rmId);
+  }
+
+  public static InetSocketAddress getRMWebAddress(Configuration conf, boolean sslEnabled, String rmId)
+  {
+    rmId = (rmId == null) ? "" : ("." + rmId);
+    InetSocketAddress address;
+    if (sslEnabled) {
+      address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmId, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
+    } else {
+      address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmId, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
+    }
+    LOG.info("rm webapp address setting {}", address);
+    LOG.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
+    InetSocketAddress resolvedSocketAddress = NetUtils.getConnectAddress(address);
+    InetAddress resolved = resolvedSocketAddress.getAddress();
+    if (resolved == null || resolved.isAnyLocalAddress() || resolved.isLoopbackAddress()) {
+      try {
+        resolvedSocketAddress = InetSocketAddress.createUnresolved(InetAddress.getLocalHost().getCanonicalHostName(), address.getPort());
+      } catch (UnknownHostException e) {
+        //Ignore and fallback.
+      }
+    }
+    return resolvedSocketAddress;
+  }
+
+  public static String getSocketConnectString(InetSocketAddress socketAddress)
+  {
+    String host;
+    InetAddress address = socketAddress.getAddress();
+    if (address == null) {
+      host = socketAddress.getHostString();
+    } else if (address.isAnyLocalAddress() || address.isLoopbackAddress()) {
+      host = address.getCanonicalHostName();
+    } else {
+      host = address.getHostName();
+    }
+    return host + ":" + socketAddress.getPort();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
index a2b2821..e9870c6 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
@@ -18,24 +18,21 @@
  */
 package com.datatorrent.stram.security;
 
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.http.FilterContainer;
 import org.apache.hadoop.http.FilterInitializer;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.util.ConfigUtils;
 
 /**
@@ -100,51 +97,20 @@ public class StramWSFilterInitializer extends FilterInitializer
     Replace with methods from Hadoop when HA support is available
     HttpConfig is not used as it's audience is private as well and it's interface has changed from Hadoop 2.2 to 2.6
   */
-  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, String rmId) {
-    boolean sslEnabled = conf.getBoolean(
-            CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
-            CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
-    return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : "");
+  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, String rmId)
+  {
+    InetSocketAddress socketAddress = StramClientUtils.getRMWebAddress(conf, rmId);
+    return StramClientUtils.getSocketConnectString(socketAddress);
   }
 
   /*
     From org.apache.hadoop.yarn.webapp.util.WebAppUtils
     Modified for HA support
   */
-  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmPrpKey)
+  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmId)
   {
-    InetSocketAddress address = null;
-    if (sslEnabled) {
-      address =
-              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmPrpKey,
-                      YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
-                      YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
-    } else {
-      address =
-              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmPrpKey,
-                      YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
-                      YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
-    }
-    logger.info("rm webapp address setting {}", address);
-    logger.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
-    address = NetUtils.getConnectAddress(address);
-    StringBuffer sb = new StringBuffer();
-    InetAddress resolved = address.getAddress();
-    if (resolved == null || resolved.isAnyLocalAddress() ||
-            resolved.isLoopbackAddress()) {
-      String lh = address.getHostName();
-      try {
-        lh = InetAddress.getLocalHost().getCanonicalHostName();
-      } catch (UnknownHostException e) {
-        //Ignore and fallback.
-      }
-      sb.append(lh);
-    } else {
-      sb.append(address.getHostName());
-    }
-    sb.append(":").append(address.getPort());
-    logger.info("rm webapp resolved hostname {}", sb.toString());
-    return sb.toString();
+    InetSocketAddress socketAddress = StramClientUtils.getRMWebAddress(conf, sslEnabled, rmId);
+    return StramClientUtils.getSocketConnectString(socketAddress);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/12af6140/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
new file mode 100644
index 0000000..2392b47
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.client;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.datatorrent.stram.util.ConfigUtils;
+
+
+/**
+ * Unit tests for StramClientUtils
+ */
+public class StramClientUtilsTest
+{
+
+  private String getHostString(String host) throws UnknownHostException
+  {
+    InetAddress address = InetAddress.getByName(host);
+    if (address.isAnyLocalAddress() || address.isLoopbackAddress()) {
+      return address.getCanonicalHostName();
+    } else {
+      return address.getHostName();
+    }
+  }
+
+  @Test
+  public void testRMWebAddress() throws UnknownHostException
+  {
+    Configuration conf = new Configuration(false);
+
+    // basic test
+    conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, false);
+    conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "192.168.1.1:8032");
+    conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "192.168.1.2:8032");
+    Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
+    conf.setBoolean(CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, true);
+    Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
+    
+    // set localhost if host is unknown
+    conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "someunknownhost:8032");
+
+    Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
+
+    // set localhost
+    conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "127.0.0.1:8032");
+    Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, null)));
+
+    // test when HA is enabled
+    conf.getBoolean(ConfigUtils.RM_HA_ENABLED, true);
+    conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm1", "192.168.1.1:8032");
+    conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + ".rm2", "192.168.1.2:8032");
+    Assert.assertEquals(getHostString("192.168.1.1") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm1")));
+    Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(StramClientUtils.getRMWebAddress(conf, "rm2")));
+
+  }
+
+}


[2/2] incubator-apex-core git commit: Merge branch 'APEX-181' of github.com:davidyan74/incubator-apex-core into devel-3

Posted by pr...@apache.org.
Merge branch 'APEX-181' of github.com:davidyan74/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/42340242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/42340242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/42340242

Branch: refs/heads/devel-3
Commit: 423402420dd616d35ba2e7f445cf98f1995cb6a9
Parents: e5a0184 12af614
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Oct 7 17:26:21 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Oct 7 17:26:21 2015 -0700

----------------------------------------------------------------------
 .../stram/client/StramClientUtils.java          | 45 +++++++++++
 .../security/StramWSFilterInitializer.java      | 50 ++----------
 .../stram/client/StramClientUtilsTest.java      | 81 ++++++++++++++++++++
 3 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------