You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/06 23:44:15 UTC

[05/11] storm git commit: moving the token adding to populate subject so on renewal the tokens are automatically added to UGI. Logging into hadoop again before renewal attempt to ensure other hadoop log-ins do not interfere.

moving the token adding to populate subject so on renewal the tokens are automatically added to UGI. Logging into hadoop again before renewal attempt to ensure other hadoop log-ins do not interfere.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e590a727
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e590a727
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e590a727

Branch: refs/heads/master
Commit: e590a7272bbb4838cbc2a9acd524e34094af3d27
Parents: b7b20fc
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 11 20:13:20 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:18:22 2014 -0800

----------------------------------------------------------------------
 external/storm-hbase/README.md                  | 11 ++-
 .../apache/storm/hbase/PersistentWordCount.java | 92 ++++++++++++++++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 53 ++++++++---
 .../storm/hbase/security/HBaseSecurityUtil.java | 18 ----
 external/storm-hdfs/README.md                   |  8 +-
 .../storm/hdfs/common/security/AutoHDFS.java    | 81 ++++++++++++-----
 .../hdfs/common/security/HdfsSecurityUtil.java  | 19 ----
 7 files changed, 204 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hbase/README.md b/external/storm-hbase/README.md
index 61d67f3..81d351a 100644
--- a/external/storm-hbase/README.md
+++ b/external/storm-hbase/README.md
@@ -47,7 +47,7 @@ StormSubmitter.submitTopology("$topologyName", config, builder.createTopology())
 ```
 
 ##Working with Secure HBASE using delegation tokens.
-If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase Master. 
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. 
 The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have 
 multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
 it to all workers. Instead of doing that you could use the following approach:
@@ -57,15 +57,18 @@ The nimbus need to start with following configurations:
 
 nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
-storm.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
-storm.kerberos.principal: "superuser@EXAMPLE.com"
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+hbase.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
+if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
+atleast 1 hour less then that.)
 
 Your topology configuration should have:
 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
 
 If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
 files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
-Nimbus will use the keytab and principal specified in the config to authenticate with HBase master node. From then on for every
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
 delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
new file mode 100644
index 0000000..94a8ff5
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase;
+
+import org.apache.storm.hbase.bolt.HBaseBolt;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.storm.hbase.topology.WordCounter;
+import org.apache.storm.hbase.topology.WordSpout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        Map<String, Object> hbConf = new HashMap<String, Object>();
+        if(args.length > 0){
+            hbConf.put("hbase.rootdir", args[0]);
+        }
+        config.put("hbase.conf", hbConf);
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+                .withConfigKey("hbase.conf");
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 1) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 2) {
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else if (args.length == 4) {
+            System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + 
+                ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+            hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+            hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index 85f7683..02c81bb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -29,19 +29,19 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 import javax.xml.bind.DatatypeConverter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.*;
 import java.net.InetAddress;
-import java.security.PrivilegedAction;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Automatically get hbase delegation tokens and push it to user's topology. The class
@@ -54,9 +54,15 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
     public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
 
+    public String hbaseKeytab;
+    public String hbasePrincipal;
+
     @Override
     public void prepare(Map conf) {
-        //no op.
+        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
+            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
+            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
+        }
     }
 
     @Override
@@ -69,13 +75,13 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
         try {
             credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
         } catch (Exception e) {
-            LOG.warn("Could not populate HBase credentials.", e);
+            LOG.error("Could not populate HBase credentials.", e);
         }
     }
 
     @Override
     public void populateCredentials(Map<String, String> credentials) {
-        //no op.
+        credentials.put(HBASE_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
     /*
@@ -96,7 +102,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
                 credential.readFields(in);
                 LOG.info("Got hbase credentials from credentials Map.");
             } catch (Exception e) {
-                LOG.warn("Could not obtain credentials from credentials map.", e);
+                LOG.error("Could not obtain credentials from credentials map.", e);
             }
         }
         return credential;
@@ -108,6 +114,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     @Override
     public void updateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     /**
@@ -116,6 +123,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     @Override
     public void populateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     @SuppressWarnings("unchecked")
@@ -129,7 +137,28 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
                 LOG.info("No credential found in credentials map.");
             }
         } catch (Exception e) {
-            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    public void addTokensToUGI(Subject subject) {
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
@@ -142,8 +171,8 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
 
                 UserProvider provider = UserProvider.instantiate(hbaseConf);
 
-                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, conf.get(HBASE_KEYTAB_FILE_KEY).toString());
-                hbaseConf.set(HBASE_PRINCIPAL_KEY, conf.get(HBASE_PRINCIPAL_KEY).toString());
+                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
+                hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
                 provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
 
                 LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index d941b66..36a4b89 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -49,24 +49,6 @@ public class HBaseSecurityUtil {
     public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
 
     public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        AccessControlContext context = AccessController.getContext();
-        Subject subject = Subject.getSubject(context);
-
-        if (subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            UserGroupInformation.getCurrentUser().addToken(token);
-                            LOG.info("Added Hbase delegation tokens to UGI.");
-                        }
-                    }
-                }
-            }
-        }
-
         //Allowing keytab based login for backward compatibility.
         UserProvider provider = UserProvider.instantiate(hbaseConfig);
         if (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 47e6db5..b37af7e 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -325,10 +325,12 @@ currently have 2 options to support this:
 Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
 The nimbus need to start with following configurations:
 
-nimbus.autocredential.plugins.classes : [org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
 hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
+less then 24 hours.)
 topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
 specified in hadoop's core-site.xml)
 
@@ -351,8 +353,8 @@ http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superuse
 You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
 
 ### Using keytabs on all worker hosts
-If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. Your
-topology configuration should have:
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a 
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:
 
 hdfs.keytab.file: "/path/to/keytab/"
 hdfs.kerberos.principal: "user@EXAMPLE.com"

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
index ad9214e..faa21ac 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -25,6 +25,7 @@ import backtype.storm.security.auth.ICredentialsRenewer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -39,6 +40,10 @@ import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
 
 /**
  * Automatically get HDFS delegation tokens and push it to user's topology. The class
@@ -49,9 +54,15 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
     public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
 
+    private String hdfsKeyTab;
+    private String hdfsPrincipal;
+
     @Override
     public void prepare(Map conf) {
-        //no op.
+        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
+            this.hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+            this.hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
+        }
     }
 
     @Override
@@ -65,13 +76,13 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
             credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
             LOG.info("HDFS tokens added to credentials map.");
         } catch (Exception e) {
-            LOG.warn("Could not populate HDFS credentials.", e);
+            LOG.error("Could not populate HDFS credentials.", e);
         }
     }
 
     @Override
     public void populateCredentials(Map<String, String> credentials) {
-        //no op.
+        credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
     /*
@@ -91,7 +102,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                 credential = new Credentials();
                 credential.readFields(in);
             } catch (Exception e) {
-                LOG.warn("Could not obtain credentials from credentials map.", e);
+                LOG.error("Could not obtain credentials from credentials map.", e);
             }
         }
         return credential;
@@ -103,6 +114,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     public void updateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     /**
@@ -111,6 +123,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     public void populateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     @SuppressWarnings("unchecked")
@@ -124,7 +137,28 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                 LOG.info("No credential found in credentials");
             }
         } catch (Exception e) {
-            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    public void addTokensToUGI(Subject subject) {
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
@@ -134,24 +168,19 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     @SuppressWarnings("unchecked")
     public void renew(Map<String, String> credentials, Map topologyConf) {
-        Credentials credential = getCredentials(credentials);
-        //maximum allowed expiration time until which tokens will keep renewing,
-        //currently set to 1 day.
-        final long MAX_ALLOWED_EXPIRATION_MILLIS = 24 * 60 * 60 * 1000;
-
         try {
+            Credentials credential = getCredentials(credentials);
             if (credential != null) {
                 Configuration configuration = new Configuration();
                 Collection<Token<? extends TokenIdentifier>> tokens = credential.getAllTokens();
 
                 if(tokens != null && tokens.isEmpty() == false) {
                     for (Token token : tokens) {
+                        //We need to re-login some other thread might have logged into hadoop using
+                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
+                        login(configuration);
                         long expiration = (Long) token.renew(configuration);
-                        if (expiration < MAX_ALLOWED_EXPIRATION_MILLIS) {
-                            LOG.debug("expiration {} is less then MAX_ALLOWED_EXPIRATION_MILLIS {}, getting new tokens",
-                                    expiration, MAX_ALLOWED_EXPIRATION_MILLIS);
-                            populateCredentials(credentials, topologyConf);
-                        }
+                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
                     }
                 } else {
                     LOG.debug("No tokens found for credentials, skipping renewal.");
@@ -160,20 +189,19 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         } catch (Exception e) {
             LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
                     "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials);
+            populateCredentials(credentials, topologyConf);
         }
     }
 
     @SuppressWarnings("unchecked")
     protected byte[] getHadoopCredentials(Map conf) {
-
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
                 final Configuration configuration = new Configuration();
-                HdfsSecurityUtil.login(conf, configuration);
+
+                login(configuration);
 
                 final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-                final String hdfsUser = (String) conf.get(HdfsSecurityUtil.STORM_USER_NAME_KEY);
 
                 final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
@@ -189,7 +217,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                             FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
                             Credentials credential= proxyUser.getCredentials();
 
-                            fileSystem.addDelegationTokens(hdfsUser, credential);
+                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
+                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
                             return credential;
                         } catch (IOException e) {
                             throw new RuntimeException(e);
@@ -214,6 +243,14 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         }
     }
 
+    private void login(Configuration configuration) throws IOException {
+        configuration.set(STORM_KEYTAB_FILE_KEY, this.hdfsKeyTab);
+        configuration.set(STORM_USER_NAME_KEY, this.hdfsPrincipal);
+        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+
+        LOG.info("Logged into hdfs with principal {}", this.hdfsPrincipal);
+    }
+
     protected String getCredentialKey() {
         return HDFS_CREDENTIALS;
     }
@@ -222,8 +259,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     public static void main(String[] args) throws Exception {
         Map conf = new HashMap();
         conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HdfsSecurityUtil.STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
-        conf.put(HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
 
         Configuration configuration = new Configuration();
         AutoHDFS autoHDFS = new AutoHDFS();

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
index d8b7f5d..5847a58 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
@@ -47,25 +47,6 @@ public class HdfsSecurityUtil {
     private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
 
     public static void login(Map conf, Configuration hdfsConfig) throws IOException {
-        //Add all tokens
-        AccessControlContext context = AccessController.getContext();
-        Subject subject = Subject.getSubject(context);
-
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            UserGroupInformation.getCurrentUser().addToken(token);
-                            LOG.info("Added delegation tokens to UGI.");
-                        }
-                    }
-                }
-            }
-        }
-
         //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
         if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName()))) {