You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:06 UTC

[05/50] [abbrv] incubator-apex-core git commit: APEX-149 #resolve Fixed the property name used to lookup RM webapp address in non-HA mode

APEX-149 #resolve Fixed the property name used to lookup RM webapp address in non-HA mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0a89c83c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0a89c83c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0a89c83c

Branch: refs/heads/master
Commit: 0a89c83c48c207cd282f264a8bf515621768eceb
Parents: 0a85586
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Sep 26 10:58:59 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Sep 26 11:02:17 2015 -0700

----------------------------------------------------------------------
 .../stram/security/StramWSFilter.java           | 47 +++++++++++++++-----
 .../security/StramWSFilterInitializer.java      | 19 ++++++--
 2 files changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
index 8be7fed..061bdc7 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 
 import com.datatorrent.stram.webapp.WebServices;
@@ -102,6 +103,7 @@ public class StramWSFilter implements Filter
         proxyAddresses = new HashSet<String>();
         for (String proxyHost : proxyHosts) {
           try {
+            logger.debug("resolving proxy hostname {}", proxyHost);
             for (InetAddress add : InetAddress.getAllByName(proxyHost)) {
               logger.debug("proxy address is: {}", add.getHostAddress());
               proxyAddresses.add(add.getHostAddress());
@@ -133,9 +135,8 @@ public class StramWSFilter implements Filter
 
     HttpServletRequest httpReq = (HttpServletRequest)req;
     HttpServletResponse httpResp = (HttpServletResponse)resp;
-    logger.debug("Remote address for request is: {}", httpReq.getRemoteAddr());
+    String remoteAddr = httpReq.getRemoteAddr();
     String requestURI = httpReq.getRequestURI();
-    logger.debug("Request path {}", requestURI);
     boolean authenticate = true;
     String user = null;
     if(getProxyAddresses().contains(httpReq.getRemoteAddr())) {
@@ -149,9 +150,11 @@ public class StramWSFilter implements Filter
       }
       if (requestURI.equals(WebServices.PATH) && (user != null)) {
         String token = createClientToken(user, httpReq.getLocalAddr());
-        logger.debug("Create token {}", token);
+        logger.debug("{}: creating token {}", remoteAddr, token);
         Cookie cookie = new Cookie(CLIENT_COOKIE, token);
         httpResp.addCookie(cookie);
+      } else {
+        logger.info("{}: proxy access to URI {} by user {}, no cookie created", remoteAddr, requestURI, user);
       }
       authenticate = false;
     }
@@ -167,19 +170,24 @@ public class StramWSFilter implements Filter
       }
       boolean valid = false;
       if (cookie != null) {
-        logger.debug("Verifying token {}", cookie.getValue());
-        user = verifyClientToken(cookie.getValue());
-        valid = true;
-        logger.debug("Token valid");
+        user = verifyClientToken(cookie.getValue(), remoteAddr);
+        if (user != null) {
+          valid = true;
+        } else {
+          logger.debug("{}: invalid cookie {}", remoteAddr, cookie.getValue());
+        }
+      } else {
+        logger.debug("{}: cookie not found {}", remoteAddr, CLIENT_COOKIE);
       }
       if (!valid) {
+        logger.debug("{}: auth failure", remoteAddr);
         httpResp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
         return;
       }
     }
 
     if(user == null) {
-      logger.debug("Could not find {} cookie, so user will not be set", WEBAPP_PROXY_USER);
+      logger.debug("{}: could not find user, so user principal will not be set", remoteAddr);
       chain.doFilter(req, resp);
     } else {
       final StramWSPrincipal principal = new StramWSPrincipal(user);
@@ -199,16 +207,31 @@ public class StramWSFilter implements Filter
     return token.encodeToUrlString();
   }
 
-  private String verifyClientToken(String tokenstr) throws IOException
+  private String verifyClientToken(String tokenstr, String cid) throws IOException
   {
     Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>();
-    token.decodeFromUrlString(tokenstr);
+    try {
+      token.decodeFromUrlString(tokenstr);
+    } catch (IOException e) {
+      logger.debug("{}: error decoding token: {}", cid, e.getMessage());
+      return null;
+    }
     byte[] identifier = token.getIdentifier();
     byte[] password = token.getPassword();
     StramDelegationTokenIdentifier tokenIdentifier = new StramDelegationTokenIdentifier();
     DataInputStream input = new DataInputStream(new ByteArrayInputStream(identifier));
-    tokenIdentifier.readFields(input);
-    tokenManager.verifyToken(tokenIdentifier, password);
+    try {
+      tokenIdentifier.readFields(input);
+    } catch (IOException e) {
+      logger.debug("{}: error decoding identifier: {}", cid, e.getMessage());
+      return null;
+    }
+    try {
+      tokenManager.verifyToken(tokenIdentifier, password);
+    } catch (SecretManager.InvalidToken e) {
+      logger.debug("{}: invalid token {}: {}", cid, tokenIdentifier, e.getMessage());
+      return null;
+    }
     return tokenIdentifier.getOwner().toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
index f4f8d22..a2b2821 100644
--- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.http.FilterInitializer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.stram.util.ConfigUtils;
 
 /**
@@ -43,12 +46,15 @@ import com.datatorrent.stram.util.ConfigUtils;
  */
 public class StramWSFilterInitializer extends FilterInitializer
 {
+  private static final Logger logger = LoggerFactory.getLogger(StramWSFilterInitializer.class);
+
   private static final String FILTER_NAME = "AM_PROXY_FILTER";
   private static final String FILTER_CLASS = StramWSFilter.class.getCanonicalName();
 
   @Override
   public void initFilter(FilterContainer container, Configuration conf)
   {
+    logger.debug("Conf {}", conf);
     Map<String, String> params = new HashMap<String, String>();
     Collection<String> proxies = new ArrayList<String>();
     if (ConfigUtils.isRMHAEnabled(conf)) {
@@ -80,6 +86,8 @@ public class StramWSFilterInitializer extends FilterInitializer
   public String getProxyHostAndPort(Configuration conf)
   {
     String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
+    logger.info("proxy address setting {}", addr);
+    logger.debug("proxy setting sources {}", conf.getPropertySources(YarnConfiguration.PROXY_ADDRESS));
     if (addr == null || addr.isEmpty()) {
       addr = getResolvedRMWebAppURLWithoutScheme(conf, null);
     }
@@ -96,27 +104,29 @@ public class StramWSFilterInitializer extends FilterInitializer
     boolean sslEnabled = conf.getBoolean(
             CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY,
             CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
-    return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : null);
+    return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : "");
   }
 
   /*
     From org.apache.hadoop.yarn.webapp.util.WebAppUtils
     Modified for HA support
   */
-  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmId)
+  public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmPrpKey)
   {
     InetSocketAddress address = null;
     if (sslEnabled) {
       address =
-              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmId,
+              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmPrpKey,
                       YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
                       YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
     } else {
       address =
-              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmId,
+              conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmPrpKey,
                       YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
                       YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
     }
+    logger.info("rm webapp address setting {}", address);
+    logger.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
     address = NetUtils.getConnectAddress(address);
     StringBuffer sb = new StringBuffer();
     InetAddress resolved = address.getAddress();
@@ -133,6 +143,7 @@ public class StramWSFilterInitializer extends FilterInitializer
       sb.append(address.getHostName());
     }
     sb.append(":").append(address.getPort());
+    logger.info("rm webapp resolved hostname {}", sb.toString());
     return sb.toString();
   }