You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:20 UTC

[19/50] [abbrv] incubator-apex-core git commit: Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by ya

Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by yarn.resourcemanager.hostname


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7

Branch: refs/heads/master
Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da
Parents: bf72215
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Sep 25 01:39:48 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Sun Oct 4 09:54:50 2015 -0700

----------------------------------------------------------------------
 .../stram/client/StramClientUtils.java          | 28 +++++++++++++-------
 .../com/datatorrent/stram/util/ConfigUtils.java |  1 +
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d596a73..45d2feb 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -22,10 +22,6 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
 import org.mozilla.javascript.Scriptable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +53,10 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.DTLoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.StreamingApplication;
 
 import com.datatorrent.stram.StramClient;
@@ -246,10 +245,7 @@ public class StramClientUtils
       for (String rmId : ConfigUtils.getRMHAIds(conf)) {
         LOG.info("Yarn Resource Manager id: {}", rmId);
         // Set RM_ID to get the corresponding RM_ADDRESS
-        services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr(
-                conf.get(RM_HOSTNAME_PREFIX + rmId),
-                YarnConfiguration.DEFAULT_RM_PORT,
-                RM_HOSTNAME_PREFIX + rmId)).toString());
+        services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString());
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
@@ -284,6 +280,20 @@ public class StramClientUtils
       credentials.addToken(token.getService(), token);
     }
 
+    public InetSocketAddress getRMHAAddress(String rmId)
+    {
+      YarnConfiguration yarnConf;
+      if (conf instanceof YarnConfiguration) {
+        yarnConf = (YarnConfiguration)conf;
+      } else {
+        yarnConf = new YarnConfiguration(conf);
+      }
+      yarnConf.set(ConfigUtils.RM_HA_ID, rmId);
+      InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+      yarnConf.unset(ConfigUtils.RM_HA_ID);
+      return socketAddr;
+    }
+
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
index 481815f..68fe27b 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
@@ -39,6 +39,7 @@ public class ConfigUtils
   private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + "ha.";
   public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
   public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
+  public static final String RM_HA_ID = RM_HA_PREFIX + "id";
   public static final boolean DEFAULT_RM_HA_ENABLED = false;
 
   private static String yarnLogDir;