You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:20 UTC
[19/50] [abbrv] incubator-apex-core git commit: Using
YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode
as it will lookup all the necessary configuration to return the address
namely yarn.resourcemanager.address followed by ya
Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by yarn.resourcemanager.hostname
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7
Branch: refs/heads/master
Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da
Parents: bf72215
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Sep 25 01:39:48 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Sun Oct 4 09:54:50 2015 -0700
----------------------------------------------------------------------
.../stram/client/StramClientUtils.java | 28 +++++++++++++-------
.../com/datatorrent/stram/util/ConfigUtils.java | 1 +
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d596a73..45d2feb 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -22,10 +22,6 @@ import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
import org.mozilla.javascript.Scriptable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +53,10 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.DTLoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramClient;
@@ -246,10 +245,7 @@ public class StramClientUtils
for (String rmId : ConfigUtils.getRMHAIds(conf)) {
LOG.info("Yarn Resource Manager id: {}", rmId);
// Set RM_ID to get the corresponding RM_ADDRESS
- services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr(
- conf.get(RM_HOSTNAME_PREFIX + rmId),
- YarnConfiguration.DEFAULT_RM_PORT,
- RM_HOSTNAME_PREFIX + rmId)).toString());
+ services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString());
}
Text rmTokenService = new Text(Joiner.on(',').join(services));
@@ -284,6 +280,20 @@ public class StramClientUtils
credentials.addToken(token.getService(), token);
}
+ public InetSocketAddress getRMHAAddress(String rmId)
+ {
+ YarnConfiguration yarnConf;
+ if (conf instanceof YarnConfiguration) {
+ yarnConf = (YarnConfiguration)conf;
+ } else {
+ yarnConf = new YarnConfiguration(conf);
+ }
+ yarnConf.set(ConfigUtils.RM_HA_ID, rmId);
+ InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+ yarnConf.unset(ConfigUtils.RM_HA_ID);
+ return socketAddr;
+ }
+
}
private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
index 481815f..68fe27b 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
@@ -39,6 +39,7 @@ public class ConfigUtils
private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + "ha.";
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
+ public static final String RM_HA_ID = RM_HA_PREFIX + "id";
public static final boolean DEFAULT_RM_HA_ENABLED = false;
private static String yarnLogDir;