You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:06 UTC
[05/50] [abbrv] incubator-apex-core git commit: APEX-149 #resolve
Fixed the property name used to lookup RM webapp address in non-HA mode
APEX-149 #resolve Fixed the property name used to lookup RM webapp address in non-HA mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0a89c83c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0a89c83c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0a89c83c
Branch: refs/heads/master
Commit: 0a89c83c48c207cd282f264a8bf515621768eceb
Parents: 0a85586
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Sep 26 10:58:59 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Sep 26 11:02:17 2015 -0700
----------------------------------------------------------------------
.../stram/security/StramWSFilter.java | 47 +++++++++++++++-----
.../security/StramWSFilterInitializer.java | 19 ++++++--
2 files changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
index 8be7fed..061bdc7 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import com.datatorrent.stram.webapp.WebServices;
@@ -102,6 +103,7 @@ public class StramWSFilter implements Filter
proxyAddresses = new HashSet<String>();
for (String proxyHost : proxyHosts) {
try {
+ logger.debug("resolving proxy hostname {}", proxyHost);
for (InetAddress add : InetAddress.getAllByName(proxyHost)) {
logger.debug("proxy address is: {}", add.getHostAddress());
proxyAddresses.add(add.getHostAddress());
@@ -133,9 +135,8 @@ public class StramWSFilter implements Filter
HttpServletRequest httpReq = (HttpServletRequest)req;
HttpServletResponse httpResp = (HttpServletResponse)resp;
- logger.debug("Remote address for request is: {}", httpReq.getRemoteAddr());
+ String remoteAddr = httpReq.getRemoteAddr();
String requestURI = httpReq.getRequestURI();
- logger.debug("Request path {}", requestURI);
boolean authenticate = true;
String user = null;
if(getProxyAddresses().contains(httpReq.getRemoteAddr())) {
@@ -149,9 +150,11 @@ public class StramWSFilter implements Filter
}
if (requestURI.equals(WebServices.PATH) && (user != null)) {
String token = createClientToken(user, httpReq.getLocalAddr());
- logger.debug("Create token {}", token);
+ logger.debug("{}: creating token {}", remoteAddr, token);
Cookie cookie = new Cookie(CLIENT_COOKIE, token);
httpResp.addCookie(cookie);
+ } else {
+ logger.info("{}: proxy access to URI {} by user {}, no cookie created", remoteAddr, requestURI, user);
}
authenticate = false;
}
@@ -167,19 +170,24 @@ public class StramWSFilter implements Filter
}
boolean valid = false;
if (cookie != null) {
- logger.debug("Verifying token {}", cookie.getValue());
- user = verifyClientToken(cookie.getValue());
- valid = true;
- logger.debug("Token valid");
+ user = verifyClientToken(cookie.getValue(), remoteAddr);
+ if (user != null) {
+ valid = true;
+ } else {
+ logger.debug("{}: invalid cookie {}", remoteAddr, cookie.getValue());
+ }
+ } else {
+ logger.debug("{}: cookie not found {}", remoteAddr, CLIENT_COOKIE);
}
if (!valid) {
+ logger.debug("{}: auth failure", remoteAddr);
httpResp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
return;
}
}
if(user == null) {
- logger.debug("Could not find {} cookie, so user will not be set", WEBAPP_PROXY_USER);
+ logger.debug("{}: could not find user, so user principal will not be set", remoteAddr);
chain.doFilter(req, resp);
} else {
final StramWSPrincipal principal = new StramWSPrincipal(user);
@@ -199,16 +207,31 @@ public class StramWSFilter implements Filter
return token.encodeToUrlString();
}
- private String verifyClientToken(String tokenstr) throws IOException
+ private String verifyClientToken(String tokenstr, String cid) throws IOException
{
Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>();
- token.decodeFromUrlString(tokenstr);
+ try {
+ token.decodeFromUrlString(tokenstr);
+ } catch (IOException e) {
+ logger.debug("{}: error decoding token: {}", cid, e.getMessage());
+ return null;
+ }
byte[] identifier = token.getIdentifier();
byte[] password = token.getPassword();
StramDelegationTokenIdentifier tokenIdentifier = new StramDelegationTokenIdentifier();
DataInputStream input = new DataInputStream(new ByteArrayInputStream(identifier));
- tokenIdentifier.readFields(input);
- tokenManager.verifyToken(tokenIdentifier, password);
+ try {
+ tokenIdentifier.readFields(input);
+ } catch (IOException e) {
+ logger.debug("{}: error decoding identifier: {}", cid, e.getMessage());
+ return null;
+ }
+ try {
+ tokenManager.verifyToken(tokenIdentifier, password);
+ } catch (SecretManager.InvalidToken e) {
+ logger.debug("{}: invalid token {}: {}", cid, tokenIdentifier, e.getMessage());
+ return null;
+ }
return tokenIdentifier.getOwner().toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
index f4f8d22..a2b2821 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.stram.util.ConfigUtils;
/**
@@ -43,12 +46,15 @@ import com.datatorrent.stram.util.ConfigUtils;
*/
public class StramWSFilterInitializer extends FilterInitializer
{
+ private static final Logger logger = LoggerFactory.getLogger(StramWSFilterInitializer.class);
+
private static final String FILTER_NAME = "AM_PROXY_FILTER";
private static final String FILTER_CLASS = StramWSFilter.class.getCanonicalName();
@Override
public void initFilter(FilterContainer container, Configuration conf)
{
+ logger.debug("Conf {}", conf);
Map<String, String> params = new HashMap<String, String>();
Collection<String> proxies = new ArrayList<String>();
if (ConfigUtils.isRMHAEnabled(conf)) {
@@ -80,6 +86,8 @@ public class StramWSFilterInitializer extends FilterInitializer
public String getProxyHostAndPort(Configuration conf)
{
String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
+ logger.info("proxy address setting {}", addr);
+ logger.debug("proxy setting sources {}", conf.getPropertySources(YarnConfiguration.PROXY_ADDRESS));
if (addr == null || addr.isEmpty()) {
addr = getResolvedRMWebAppURLWithoutScheme(conf, null);
}
@@ -96,27 +104,29 @@ public class StramWSFilterInitializer extends FilterInitializer
boolean sslEnabled = conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
- return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : null);
+ return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : "");
}
/*
From org.apache.hadoop.yarn.webapp.util.WebAppUtils
Modified for HA support
*/
- public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmId)
+ public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmPrpKey)
{
InetSocketAddress address = null;
if (sslEnabled) {
address =
- conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmId,
+ conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmPrpKey,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else {
address =
- conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmId,
+ conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmPrpKey,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
}
+ logger.info("rm webapp address setting {}", address);
+ logger.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
address = NetUtils.getConnectAddress(address);
StringBuffer sb = new StringBuffer();
InetAddress resolved = address.getAddress();
@@ -133,6 +143,7 @@ public class StramWSFilterInitializer extends FilterInitializer
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
+ logger.info("rm webapp resolved hostname {}", sb.toString());
return sb.toString();
}