You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/08 05:05:45 UTC

[1/6] storm git commit: [STORM-2482] Refactor the Storm auto credential plugins to be more usable

Repository: storm
Updated Branches:
  refs/heads/master 5b270d3d7 -> 0639244f7


http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ac6928b..2ec2e6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -332,6 +332,7 @@
         <module>sql</module>
 
         <!-- externals -->
+        <module>external/storm-autocreds</module>
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/storm-dist/binary/final-package/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index 46cf1b2..0d718e2 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -302,7 +302,14 @@
                 <include>storm*jar</include>
             </includes>
         </fileSet>
-
+       <!-- Autocred plugins -->
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-autocreds/target/app-assembler/repo</directory>
+            <outputDirectory>external/storm-autocreds</outputDirectory>
+            <includes>
+                <include>*jar</include>
+            </includes>
+        </fileSet>
     </fileSets>
 
     <files>


[3/6] storm git commit: [STORM-2482] addressing review comments

Posted by ka...@apache.org.
[STORM-2482] addressing review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/289d6ff7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/289d6ff7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/289d6ff7

Branch: refs/heads/master
Commit: 289d6ff743c42ef3cf7bd1e9e8fe52652aeac457
Parents: a0122ae
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Apr 27 18:17:19 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Wed May 3 14:13:38 2017 +0530

----------------------------------------------------------------------
 docs/storm-hbase.md                                     |  2 +-
 .../java/org/apache/storm/common/AbstractAutoCreds.java | 12 +++++-------
 2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/289d6ff7/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
index 0fb2c14..ec88054 100644
--- a/docs/storm-hbase.md
+++ b/docs/storm-hbase.md
@@ -98,7 +98,7 @@ topology submission, nimbus will impersonate the topology submitter user and acq
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
 delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.
 
-As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal 
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in hbase.kerberos.principal 
 has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
 listed on this link
 

http://git-wip-us.apache.org/repos/asf/storm/blob/289d6ff7/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
index 816e263..cc374c9 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
@@ -74,7 +74,7 @@ public abstract class AbstractAutoCreds implements IAutoCredentials, ICredential
     @Override
     public void populateCredentials(Map<String, String> credentials, Map conf) {
         try {
-            if (configKeys != null) {
+            if (!configKeys.isEmpty()) {
                 Map<String, Object> updatedConf = updateConfigs(conf);
                 for (String configKey : configKeys) {
                     credentials.put(getCredentialKey(configKey),
@@ -92,11 +92,9 @@ public abstract class AbstractAutoCreds implements IAutoCredentials, ICredential
 
     private Map<String, Object> updateConfigs(Map topologyConf) {
         Map<String, Object> res = new HashMap<>(topologyConf);
-        if (configKeys != null) {
-            for (String configKey : configKeys) {
-                if (!res.containsKey(configKey) && configMap.containsKey(configKey)) {
-                    res.put(configKey, configMap.get(configKey));
-                }
+        for (String configKey : configKeys) {
+            if (!res.containsKey(configKey) && configMap.containsKey(configKey)) {
+                res.put(configKey, configMap.get(configKey));
             }
         }
         return res;
@@ -134,7 +132,7 @@ public abstract class AbstractAutoCreds implements IAutoCredentials, ICredential
 
     protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials) {
         Set<Pair<String, Credentials>> res = new HashSet<>();
-        if (configKeys != null) {
+        if (!configKeys.isEmpty()) {
             for (String configKey : configKeys) {
                 Credentials cred = doGetCredentials(credentials, configKey);
                 if (cred != null) {


[4/6] storm git commit: [STORM-2482] Fixes to get the storm-autocreds plugin working in master

Posted by ka...@apache.org.
[STORM-2482] Fixes to get the storm-autocreds plugin working in master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56310f90
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56310f90
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56310f90

Branch: refs/heads/master
Commit: 56310f90cc49539c18b3701bc2256dbed816ab2c
Parents: 289d6ff
Author: Arun Mahadevan <ar...@apache.org>
Authored: Wed May 3 14:52:57 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Wed May 3 18:09:50 2017 +0530

----------------------------------------------------------------------
 external/storm-autocreds/pom.xml                                 | 4 ++--
 .../src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java   | 4 ----
 .../java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java    | 3 +--
 external/storm-hbase/pom.xml                                     | 1 -
 pom.xml                                                          | 1 +
 storm-dist/binary/final-package/src/main/assembly/binary.xml     | 2 +-
 6 files changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/external/storm-autocreds/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
index 3762cfd..c6dcb7e 100644
--- a/external/storm-autocreds/pom.xml
+++ b/external/storm-autocreds/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>1.1.1-SNAPSHOT</version>
+        <version>2.0.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
@@ -32,7 +32,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
+            <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
             <scope>${provided.scope}</scope>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
index e1e8512..5601ea4 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
@@ -21,7 +21,6 @@ package org.apache.storm.hdfs.security;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -37,12 +36,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.net.URI;
-import java.nio.file.Paths;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;

http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
index c0f3c79..94ea811 100644
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
@@ -17,11 +17,10 @@
  */
 package org.apache.storm.hdfs.security;
 
-import org.apache.storm.security.auth.kerberos.AutoTGT;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.storm.security.auth.kerberos.AutoTGT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index ea49ad0..4e763ab 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -36,7 +36,6 @@
     </developers>
 
     <properties>
-        <hbase.version>1.1.0</hbase.version>
         <hdfs.version>${hadoop.version}</hdfs.version>
         <caffeine.version>2.3.5</caffeine.version>
     </properties>

http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2ec2e6c..282a7fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -276,6 +276,7 @@
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
         <hive.version>0.14.0</hive.version>
         <hadoop.version>2.6.1</hadoop.version>
+        <hbase.version>1.1.0</hbase.version>
         <kryo.version>3.0.3</kryo.version>
         <servlet.version>2.5</servlet.version>
         <joda-time.version>2.3</joda-time.version>

http://git-wip-us.apache.org/repos/asf/storm/blob/56310f90/storm-dist/binary/final-package/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index 0d718e2..08b0035 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -304,7 +304,7 @@
         </fileSet>
        <!-- Autocred plugins -->
         <fileSet>
-            <directory>${project.basedir}/../../external/storm-autocreds/target/app-assembler/repo</directory>
+            <directory>${project.basedir}/../../../external/storm-autocreds/target/app-assembler/repo</directory>
             <outputDirectory>external/storm-autocreds</outputDirectory>
             <includes>
                 <include>*jar</include>


[6/6] storm git commit: STORM-2482: CHANGELOG

Posted by ka...@apache.org.
STORM-2482: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0639244f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0639244f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0639244f

Branch: refs/heads/master
Commit: 0639244f7127e57afae46136daf296155704ced9
Parents: f342d5c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 8 14:05:06 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 8 14:05:06 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0639244f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c713f5d..4343c28 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -243,6 +243,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.1
+ * STORM-2482: Refactor the Storm auto credential plugins to be more usable
  * STORM-2343: New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once.
  * STORM-2488: The UI user Must be HTTP
  * STORM-2315: New kafka spout can't commit offset when ack is disabled


[2/6] storm git commit: [STORM-2482] Refactor the Storm auto credential plugins to be more usable

Posted by ka...@apache.org.
[STORM-2482] Refactor the Storm auto credential plugins to be more usable

1. Create a new storm module storm-autocreds
2. Move AutoHDFS and AutoHBase to storm-autocreds
3. Refactor code and accepts config keys for customizing the hadoop configuration for the plugins
4. Package the auto cred and dependency jars in the storm binary and deploy to lib-autocreds


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0122aed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0122aed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0122aed

Branch: refs/heads/master
Commit: a0122aed59ea0e151a6017593afec550f88e5081
Parents: 2c597e5
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Apr 18 13:29:56 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Wed May 3 14:13:31 2017 +0530

----------------------------------------------------------------------
 docs/SECURITY.md                                |  18 +-
 docs/storm-hbase.md                             |  35 ++-
 docs/storm-hdfs.md                              |  43 ++-
 external/storm-autocreds/pom.xml                | 103 +++++++
 .../apache/storm/common/AbstractAutoCreds.java  | 250 +++++++++++++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 179 ++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  73 +++++
 .../apache/storm/hdfs/security/AutoHDFS.java    | 216 ++++++++++++++
 .../storm/hdfs/security/HdfsSecurityUtil.java   |  69 +++++
 external/storm-hbase/pom.xml                    |   5 +
 .../apache/storm/hbase/security/AutoHBase.java  | 243 ----------------
 .../storm/hbase/security/HBaseSecurityUtil.java |  72 -----
 external/storm-hdfs/pom.xml                     |   5 +
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   2 +-
 .../storm/hdfs/common/security/AutoHDFS.java    | 281 -------------------
 .../hdfs/common/security/HdfsSecurityUtil.java  |  67 -----
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |   2 +-
 .../apache/storm/hdfs/trident/HdfsState.java    |   2 +-
 pom.xml                                         |   1 +
 .../final-package/src/main/assembly/binary.xml  |   9 +-
 20 files changed, 982 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/SECURITY.md
----------------------------------------------------------------------
diff --git a/docs/SECURITY.md b/docs/SECURITY.md
index 66566ce..e73e873 100644
--- a/docs/SECURITY.md
+++ b/docs/SECURITY.md
@@ -424,16 +424,18 @@ nimbus.impersonation.acl:
 
 ### Automatic Credentials Push and Renewal
 Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services.  Exposing this to all of the users can be a pain for them.
-To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed.
-These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway 
-and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to org.apache.storm.security.auth.kerberos.AutoTGT.  
-nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user.
+To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. These are controlled by the following configs.
+ 
+`topology.auto-credentials` is a list of java plugins, all of which must implement the `IAutoCredentials` interface, that populate the credentials on gateway 
+and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to `org.apache.storm.security.auth.kerberos.AutoTGT`
+
+`nimbus.credential.renewers.classes` should also be set to `org.apache.storm.security.auth.kerberos.AutoTGT` so that nimbus can periodically renew the TGT on behalf of the user.
 
-nimbus.credential.renewers.freq.secs controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine.
+`nimbus.credential.renewers.freq.secs` controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine.
 
-In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configures using nimbus.autocredential.plugins.classes which is a list 
-of fully qualified class names ,all of which must implement INimbusCredentialPlugin.  Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology
-submission. You should use this config with topology.auto-credentials and nimbus.credential.renewers.classes so the credentials can be populated on worker side and nimbus can automatically renew
+In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configured using `nimbus.autocredential.plugins.classes` which is a list 
+of fully qualified class names, all of which must implement `INimbusCredentialPlugin`.  Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology
+submission. You should use this config with `topology.auto-credentials` and `nimbus.credential.renewers.classes` so the credentials can be populated on worker side and nimbus can automatically renew
 them. Currently there are 2 examples of using this config, AutoHDFS and AutoHBase which auto populates hdfs and hbase delegation tokens for topology submitter so they don't have to distribute keytabs
 on all possible worker hosts.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/storm-hbase.md
----------------------------------------------------------------------
diff --git a/docs/storm-hbase.md b/docs/storm-hbase.md
index a1b0764..0fb2c14 100644
--- a/docs/storm-hbase.md
+++ b/docs/storm-hbase.md
@@ -56,22 +56,43 @@ The approach described above requires that all potential worker hosts have "stor
 multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
 it to all workers. Instead of doing that you could use the following approach:
 
-Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
-The nimbus need to start with following configurations:
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations:
 
+```
 nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
 hbase.kerberos.principal: "superuser@EXAMPLE.com"
-nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
-if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
-atleast 1 hour less then that.)
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed,  if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)
+```
 
 Your topology configuration should have:
-topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+```
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]
+```
 
 If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
-files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
+files(core-site.xml, hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hbase-site.xml) to the classpath, you could specify the configurations as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology),
+
+```
+hbaseCredentialsConfigKeys : ["cluster1", "cluster2"] (the hbase clusters you want to fetch the tokens from)
+cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of config key-values specific to cluster1)
+cluster2: [{"config1": "value1", "hbase.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hbase.kerberos.principal": "cluster2user@EXAMPLE.com"}] (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
+```
+
+Instead of specifying key values you may also directly specify the resource files for e.g.,
+
+```
+cluster1: [{"resources": ["/path/to/core-site1.xml", "/path/to/hbase-site1.xml"]}]
+cluster2: [{"resources": ["/path/to/core-site2.xml", "/path/to/hbase-site2.xml"]}]
+```
+
+Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. 
+This way it would be possible to run multiple bolts connecting to separate HBase cluster within the same topology.
+
 Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/docs/storm-hdfs.md
----------------------------------------------------------------------
diff --git a/docs/storm-hdfs.md b/docs/storm-hdfs.md
index d2c7567..8391efd 100644
--- a/docs/storm-hdfs.md
+++ b/docs/storm-hdfs.md
@@ -412,23 +412,44 @@ If your topology is going to interact with secure HDFS, your bolts/states needs
 currently have 2 options to support this:
 
 ### Using HDFS delegation tokens 
-Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
-The nimbus need to start with following configurations:
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations:
 
-nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
-nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+```
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.security.AutoHDFS"]
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.security.AutoHDFS"]
 hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
 hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
-nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
-less then 24 hours.)
-topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
-specified in hadoop's core-site.xml)
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property specified in hadoop's core-site.xml)
+```
 
 Your topology configuration should have:
-topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 
-If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration 
-files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. 
+```
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]
+```
+
+If nimbus did not have the above configuration you need to add and then restart it. Ensure the hadoop configuration 
+files (core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml and hdfs-site.xml) to the classpath, you could specify the configurations
+as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology),
+
+```
+hdfsCredentialsConfigKeys : ["cluster1", "cluster2"] (the hdfs clusters you want to fetch the tokens from)
+cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of config key-values specific to cluster1)
+cluster2: [{"config1": "value1", "hdfs.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hdfs.kerberos.principal": "cluster2user@EXAMPLE.com"}] (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
+```
+
+Instead of specifying key values you may also directly specify the resource files for e.g.,
+
+```
+cluster1: [{"resources": ["/path/to/core-site1.xml", "/path/to/hdfs-site1.xml"]}]
+cluster2: [{"resources": ["/path/to/core-site2.xml", "/path/to/hdfs-site2.xml"]}]
+```
+
+Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. This way it would be possible to run multiple bolts connecting to separate HDFS cluster within the same topology.
+
 Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml
new file mode 100644
index 0000000..3762cfd
--- /dev/null
+++ b/external/storm-autocreds/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.1-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>storm-autocreds</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+    <plugins>
+       <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>appassembler-maven-plugin</artifactId>
+          <version>1.9</version>
+          <executions>
+                    <execution>
+                        <id>create-repo</id>
+                        <goals>
+                            <goal>create-repository</goal>
+                        </goals>
+                        <configuration>
+                            <assembleDirectory>${project.build.directory}/app-assembler</assembleDirectory>
+                            <repositoryLayout>flat</repositoryLayout>
+                        </configuration>
+                    </execution>
+          </executions>
+        </plugin>
+    </plugins>
+   </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
new file mode 100644
index 0000000..816e263
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some of the common functionality.
+ */
+public abstract class AbstractAutoCreds implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractAutoCreds.class);
+    public static final String CONFIG_KEY_RESOURCES = "resources";
+
+    private List<String> configKeys = new ArrayList<>();
+    private Map<String, Map<String, Object>> configMap = new HashMap<>();
+
+    @Override
+    public void prepare(Map conf) {
+        doPrepare(conf);
+        String configKeyString = getConfigKeyString();
+        if (conf.containsKey(configKeyString)) {
+            configKeys.addAll((List<String>) conf.get(configKeyString));
+            for (String key : configKeys) {
+                if (conf.containsKey(key)) {
+                    Map<String, Object> config = (Map<String, Object>) conf.get(key);
+                    configMap.put(key, config);
+                    LOG.info("configKey = {}, config = {}", key, config);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map conf) {
+        try {
+            if (configKeys != null) {
+                Map<String, Object> updatedConf = updateConfigs(conf);
+                for (String configKey : configKeys) {
+                    credentials.put(getCredentialKey(configKey),
+                            DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, configKey)));
+                }
+            } else {
+                credentials.put(getCredentialKey(StringUtils.EMPTY),
+                        DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            }
+            LOG.info("Tokens added to credentials map.");
+        } catch (Exception e) {
+            LOG.error("Could not populate credentials.", e);
+        }
+    }
+
+    private Map<String, Object> updateConfigs(Map topologyConf) {
+        Map<String, Object> res = new HashMap<>(topologyConf);
+        if (configKeys != null) {
+            for (String configKey : configKeys) {
+                if (!res.containsKey(configKey) && configMap.containsKey(configKey)) {
+                    res.put(configKey, configMap.get(configKey));
+                }
+            }
+        }
+        return res;
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        doRenew(credentials, updateConfigs(topologyConf));
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        credentials.put(getCredentialKey(StringUtils.EMPTY),
+                DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
+    }
+
+    protected Set<Pair<String, Credentials>> getCredentials(Map<String, String> credentials) {
+        Set<Pair<String, Credentials>> res = new HashSet<>();
+        if (configKeys != null) {
+            for (String configKey : configKeys) {
+                Credentials cred = doGetCredentials(credentials, configKey);
+                if (cred != null) {
+                    res.add(new Pair(configKey, cred));
+                }
+            }
+        } else {
+            Credentials cred = doGetCredentials(credentials, StringUtils.EMPTY);
+            if (cred != null) {
+                res.add(new Pair(StringUtils.EMPTY, cred));
+            }
+        }
+        return res;
+    }
+
+    protected void fillHadoopConfiguration(Map topoConf, String configKey, Configuration configuration) {
+        Map<String, Object> config = (Map<String, Object>) topoConf.get(configKey);
+        LOG.info("TopoConf {}, got config {}, for configKey {}", topoConf, config, configKey);
+        if (config != null) {
+            List<String> resourcesToLoad = new ArrayList<>();
+            for (Map.Entry<String, Object> entry : config.entrySet()) {
+                if (entry.getKey().equals(CONFIG_KEY_RESOURCES)) {
+                    resourcesToLoad.addAll((List<String>) entry.getValue());
+                } else {
+                    configuration.set(entry.getKey(), String.valueOf(entry.getValue()));
+                }
+            }
+            LOG.info("Resources to load {}", resourcesToLoad);
+            // add configs from resources like hdfs-site.xml
+            for (String pathStr : resourcesToLoad) {
+                configuration.addResource(new Path(Paths.get(pathStr).toUri()));
+            }
+        }
+        LOG.info("Initializing UGI with config {}", configuration);
+        UserGroupInformation.setConfiguration(configuration);
+    }
+
+    /**
+     * Prepare the plugin
+     *
+     * @param conf the storm cluster conf set via storm.yaml
+     */
+    protected abstract void doPrepare(Map conf);
+
+    /**
+     * The lookup key for the config key string
+     *
+     * @return the config key string
+     */
+    protected abstract String getConfigKeyString();
+
+    /**
+     * The key with which the credentials are stored in the credentials map
+     */
+    protected abstract String getCredentialKey(String configKey);
+
+    protected abstract byte[] getHadoopCredentials(Map conf, String configKey);
+
+    protected abstract byte[] getHadoopCredentials(Map conf);
+
+    protected abstract void doRenew(Map<String, String> credentials, Map topologyConf);
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+                subject.getPrivateCredentials().add(cred.getSecond());
+                LOG.info("Credentials added to the subject.");
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    private void addTokensToUGI(Subject subject) {
+        if (subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private Credentials doGetCredentials(Map<String, String> credentials, String configKey) {
+        Credentials credential = null;
+        if (credentials != null && credentials.containsKey(getCredentialKey(configKey))) {
+            try {
+                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey(configKey)));
+                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+                credential = new Credentials();
+                credential.readFields(in);
+            } catch (Exception e) {
+                LOG.error("Could not obtain credentials from credentials map.", e);
+            }
+        }
+        return credential;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
new file mode 100644
index 0000000..fcbb463
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractAutoCreds;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.*;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Automatically get hbase delegation tokens and push it to user's topology. The class
+ * assumes that hadoop/hbase configuration files are in your class path.
+ */
+public class AutoHBase extends AbstractAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
+
+    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+    public String hbaseKeytab;
+    public String hbasePrincipal;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
+            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
+            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HBaseSecurityUtil.HBASE_CREDENTIALS_CONFIG_KEYS;
+    }
+
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, HBaseConfiguration.create());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = HBaseConfiguration.create();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf, Configuration hbaseConf) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                UserProvider provider = UserProvider.instantiate(hbaseConf);
+
+                if (hbaseConf.get(HBASE_KEYTAB_FILE_KEY) == null) {
+                    hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
+                }
+                if (hbaseConf.get(HBASE_PRINCIPAL_KEY) == null) {
+                    hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
+                }
+                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                User user = User.create(ugi);
+
+                if(user.isHBaseSecurityEnabled(hbaseConf)) {
+                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+                    LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+                    Credentials credential= proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } else {
+                    throw new RuntimeException("Security is not enabled for HBase.");
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    @Override
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        //HBASE tokens are not renewable so we always have to get new ones.
+        populateCredentials(credentials, topologyConf);
+    }
+
+    @Override
+    protected String getCredentialKey(String configKey) {
+        return HBASE_CREDENTIALS + configKey;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
+        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+        AutoHBase autoHBase = new AutoHBase();
+        autoHBase.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHBase.populateCredentials(creds, conf);
+        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHBase.populateSubject(s, creds);
+        LOG.info("Got a Subject " + s);
+
+        autoHBase.renew(creds, conf);
+        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
new file mode 100644
index 0000000..4e0dcab
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
+/**
+ * This class provides util methods for storm-hbase connector communicating
+ * with secured HBase.
+ */
+public class HBaseSecurityUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
+
+    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
+    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
+    public static final String HBASE_CREDENTIALS_CONFIG_KEYS = "hbaseCredentialsConfigKeys";
+    private static  UserProvider legacyProvider = null;
+
+    public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
+        //Allowing keytab based login for backward compatibility.
+        if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {
+            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+            //insure that if keytab is used only one login per process executed
+            if(legacyProvider == null) {
+                synchronized (HBaseSecurityUtil.class) {
+                    if(legacyProvider == null) {
+                        legacyProvider = UserProvider.instantiate(hbaseConfig);
+                        String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                        if (keytab != null) {
+                            hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                        }
+                        String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                        if (userName != null) {
+                            hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+                        }
+                        legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
+                                InetAddress.getLocalHost().getCanonicalHostName());
+                    }
+                }
+            }
+            return legacyProvider;
+        } else {
+            return UserProvider.instantiate(hbaseConfig);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
new file mode 100644
index 0000000..e1e8512
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFS.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.security;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.Config;
+import org.apache.storm.common.AbstractAutoCreds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.nio.file.Paths;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
+
+/**
+ * Automatically get HDFS delegation tokens and push it to user's topology. The class
+ * assumes that HDFS configuration files are in your class path.
+ */
+public class AutoHDFS extends AbstractAutoCreds {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
+    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
+    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
+
+    private String hdfsKeyTab;
+    private String hdfsPrincipal;
+
+    @Override
+    public void doPrepare(Map conf) {
+        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
+            hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+            hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
+        }
+    }
+
+    @Override
+    protected String getConfigKeyString() {
+        return HdfsSecurityUtil.HDFS_CREDENTIALS_CONFIG_KEYS;
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    protected  byte[] getHadoopCredentials(Map conf, String configKey) {
+        Configuration configuration = getHadoopConfiguration(conf, configKey);
+        return getHadoopCredentials(conf, configuration);
+    }
+
+    @Override
+    protected byte[] getHadoopCredentials(Map conf) {
+        return getHadoopCredentials(conf, new Configuration());
+    }
+
+    private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
+        Configuration configuration = new Configuration();
+        fillHadoopConfiguration(topoConf, configKey, configuration);
+        return configuration;
+    }
+
+    @SuppressWarnings("unchecked")
+    private byte[] getHadoopCredentials(Map conf, final Configuration configuration) {
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                login(configuration);
+
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
+                        : FileSystem.getDefaultUri(configuration);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
+                    @Override
+                    public Object run() {
+                        try {
+                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
+                            Credentials credential= proxyUser.getCredentials();
+
+                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
+                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
+                            return credential;
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+
+                ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(bao);
+
+                creds.write(out);
+                out.flush();
+                out.close();
+
+                return bao.toByteArray();
+            } else {
+                throw new RuntimeException("Security is not enabled for HDFS");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void doRenew(Map<String, String> credentials, Map topologyConf) {
+        for (Pair<String, Credentials> cred : getCredentials(credentials)) {
+            try {
+                Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
+                Collection<Token<? extends TokenIdentifier>> tokens = cred.getSecond().getAllTokens();
+
+                if (tokens != null && !tokens.isEmpty()) {
+                    for (Token token : tokens) {
+                        //We need to re-login some other thread might have logged into hadoop using
+                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
+                        login(configuration);
+                        long expiration = token.renew(configuration);
+                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
+                    }
+                } else {
+                    LOG.debug("No tokens found for credentials, skipping renewal.");
+                }
+            } catch (Exception e) {
+                LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
+                        "renewal period so attempting to get new tokens.", e);
+                populateCredentials(credentials, topologyConf);
+            }
+        }
+    }
+
+    private void login(Configuration configuration) throws IOException {
+        if (configuration.get(STORM_KEYTAB_FILE_KEY) == null) {
+            configuration.set(STORM_KEYTAB_FILE_KEY, hdfsKeyTab);
+        }
+        if (configuration.get(STORM_USER_NAME_KEY) == null) {
+            configuration.set(STORM_USER_NAME_KEY, hdfsPrincipal);
+        }
+        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+
+        LOG.info("Logged into hdfs with principal {}", configuration.get(STORM_USER_NAME_KEY));
+    }
+
+    @Override
+    protected String getCredentialKey(String configKey) {
+        return HDFS_CREDENTIALS + configKey;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+
+        Configuration configuration = new Configuration();
+        AutoHDFS autoHDFS = new AutoHDFS();
+        autoHDFS.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHDFS.populateCredentials(creds, conf);
+        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHDFS.populateSubject(s, creds);
+        LOG.info("Got a Subject "+ s);
+
+        autoHDFS.renew(creds, conf);
+        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
new file mode 100644
index 0000000..c0f3c79
--- /dev/null
+++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/HdfsSecurityUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.security;
+
+import org.apache.storm.security.auth.kerberos.AutoTGT;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
+/**
+ * This class provides util methods for storm-hdfs connector communicating
+ * with secured HDFS.
+ */
+public class HdfsSecurityUtil {
+    public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
+    public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
+    public static final String HDFS_CREDENTIALS_CONFIG_KEYS = "hdfsCredentialsConfigKeys";
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
+    private static AtomicBoolean isLoggedIn = new AtomicBoolean();
+    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
+        //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
+        if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                (!(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName())) &&
+                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
+            if (UserGroupInformation.isSecurityEnabled()) {
+                // compareAndSet added because of https://issues.apache.org/jira/browse/STORM-1535
+                if (isLoggedIn.compareAndSet(false, true)) {
+                    LOG.info("Logging in using keytab as AutoHDFS is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+                    String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                    if (keytab != null) {
+                        hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                    }
+                    String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                    if (userName != null) {
+                        hdfsConfig.set(STORM_USER_NAME_KEY, userName);
+                    }
+                    SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index fd6bfe5..ea49ad0 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -94,5 +94,10 @@
             <artifactId>caffeine</artifactId>
             <version>${caffeine.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-autocreds</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
deleted file mode 100644
index a2ca68e..0000000
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hbase.security;
-
-import org.apache.storm.Config;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Automatically get hbase delegation tokens and push it to user's topology. The class
- * assumes that hadoop/hbase configuration files are in your class path.
- */
-public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
-
-    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
-    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
-    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
-
-    public String hbaseKeytab;
-    public String hbasePrincipal;
-
-    @Override
-    public void prepare(Map conf) {
-        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
-            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
-            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
-        try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
-        } catch (Exception e) {
-            LOG.error("Could not populate HBase credentials.", e);
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        credentials.put(HBASE_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
-    }
-
-    /*
- *
- * @param credentials map with creds.
- * @return instance of org.apache.hadoop.security.Credentials.
- * this class's populateCredentials must have been called before.
- */
-    @SuppressWarnings("unchecked")
-    protected Object getCredentials(Map<String, String> credentials) {
-        Credentials credential = null;
-        if (credentials != null && credentials.containsKey(getCredentialKey())) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
-                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
-
-                credential = new Credentials();
-                credential.readFields(in);
-                LOG.info("Got hbase credentials from credentials Map.");
-            } catch (Exception e) {
-                LOG.error("Could not obtain credentials from credentials map.", e);
-            }
-        }
-        return credential;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            Object credential = getCredentials(credentials);
-            if (credential != null) {
-                subject.getPrivateCredentials().add(credential);
-                LOG.info("Hbase credentials added to subject.");
-            } else {
-                LOG.info("No credential found in credentials map.");
-            }
-        } catch (Exception e) {
-            LOG.error("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    public void addTokensToUGI(Subject subject) {
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            try {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            } catch (IOException e) {
-                                LOG.error("Exception while trying to add tokens to ugi", e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
-        try {
-            final Configuration hbaseConf = HBaseConfiguration.create();
-            if(UserGroupInformation.isSecurityEnabled()) {
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                UserProvider provider = UserProvider.instantiate(hbaseConf);
-
-                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
-                hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
-                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
-
-                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
-                UserGroupInformation.setConfiguration(hbaseConf);
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                User user = User.create(ugi);
-
-                if(user.isHBaseSecurityEnabled(hbaseConf)) {
-                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
-
-                    LOG.info("Obtained HBase tokens, adding to user credentials.");
-
-                    Credentials credential= proxyUser.getCredentials();
-                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                    ObjectOutputStream out = new ObjectOutputStream(bao);
-                    credential.write(out);
-                    out.flush();
-                    out.close();
-                    return bao.toByteArray();
-                } else {
-                    throw new RuntimeException("Security is not enabled for HBase.");
-                }
-            } else {
-                throw new RuntimeException("Security is not enabled for Hadoop");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    @Override
-    public void renew(Map<String, String> credentials, Map topologyConf) {
-        //HBASE tokens are not renewable so we always have to get new ones.
-        populateCredentials(credentials, topologyConf);
-    }
-
-    protected String getCredentialKey() {
-        return HBASE_CREDENTIALS;
-    }
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
-        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
-
-        AutoHBase autoHBase = new AutoHBase();
-        autoHBase.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHBase.populateCredentials(creds, conf);
-        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHBase.populateSubject(s, creds);
-        LOG.info("Got a Subject " + s);
-
-        autoHBase.renew(creds, conf);
-        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
deleted file mode 100644
index e579015..0000000
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
-/**
- * This class provides util methods for storm-hbase connector communicating
- * with secured HBase.
- */
-public class HBaseSecurityUtil {
-    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
-
-    public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
-    public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
-    private static  UserProvider legacyProvider = null;
-
-    public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        //Allowing keytab based login for backward compatibility.
-        if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
-                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {
-            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
-            //insure that if keytab is used only one login per process executed
-            if(legacyProvider == null) {
-                synchronized (HBaseSecurityUtil.class) {
-                    if(legacyProvider == null) {
-                        legacyProvider = UserProvider.instantiate(hbaseConfig);
-                        String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-                        if (keytab != null) {
-                            hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
-                        }
-                        String userName = (String) conf.get(STORM_USER_NAME_KEY);
-                        if (userName != null) {
-                            hbaseConfig.set(STORM_USER_NAME_KEY, userName);
-                        }
-                        legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
-                                InetAddress.getLocalHost().getCanonicalHostName());
-                    }
-                }
-            }
-            return legacyProvider;
-        } else {
-            return UserProvider.instantiate(hbaseConfig);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 8b3a792..9f16cf2 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -210,6 +210,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-autocreds</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 681d66a..395cced 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -34,7 +34,7 @@ import org.apache.storm.hdfs.common.AbstractHDFSWriter;
 import org.apache.storm.hdfs.common.NullPartitioner;
 import org.apache.storm.hdfs.common.Partitioner;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
deleted file mode 100644
index ff3f9cc..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.common.security;
-
-import org.apache.storm.Config;
-import org.apache.storm.security.INimbusCredentialPlugin;
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.ICredentialsRenewer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
-import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
-
-/**
- * Automatically get HDFS delegation tokens and push it to user's topology. The class
- * assumes that HDFS configuration files are in your class path.
- */
-public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
-    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
-    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
-
-    private String hdfsKeyTab;
-    private String hdfsPrincipal;
-
-    @Override
-    public void prepare(Map conf) {
-        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
-            this.hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            this.hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
-        }
-    }
-
-    @Override
-    public void shutdown() {
-        //no op.
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
-        try {
-            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
-            LOG.info("HDFS tokens added to credentials map.");
-        } catch (Exception e) {
-            LOG.error("Could not populate HDFS credentials.", e);
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
-    }
-
-    /*
- *
- * @param credentials map with creds.
- * @return instance of org.apache.hadoop.security.Credentials.
- * this class's populateCredentials must have been called before.
- */
-    @SuppressWarnings("unchecked")
-    protected Credentials getCredentials(Map<String, String> credentials) {
-        Credentials credential = null;
-        if (credentials != null && credentials.containsKey(getCredentialKey())) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
-                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
-
-                credential = new Credentials();
-                credential.readFields(in);
-            } catch (Exception e) {
-                LOG.error("Could not obtain credentials from credentials map.", e);
-            }
-        }
-        return credential;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-        addTokensToUGI(subject);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            Credentials credential = getCredentials(credentials);
-            if (credential != null) {
-                subject.getPrivateCredentials().add(credential);
-                LOG.info("HDFS Credentials added to the subject.");
-            } else {
-                LOG.info("No credential found in credentials");
-            }
-        } catch (Exception e) {
-            LOG.error("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    public void addTokensToUGI(Subject subject) {
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            try {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            } catch (IOException e) {
-                                LOG.error("Exception while trying to add tokens to ugi", e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    @SuppressWarnings("unchecked")
-    public void renew(Map<String, String> credentials, Map topologyConf) {
-        try {
-            Credentials credential = getCredentials(credentials);
-            if (credential != null) {
-                Configuration configuration = new Configuration();
-                Collection<Token<? extends TokenIdentifier>> tokens = credential.getAllTokens();
-
-                if(tokens != null && tokens.isEmpty() == false) {
-                    for (Token token : tokens) {
-                        //We need to re-login some other thread might have logged into hadoop using
-                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
-                        login(configuration);
-                        long expiration = (Long) token.renew(configuration);
-                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
-                    }
-                } else {
-                    LOG.debug("No tokens found for credentials, skipping renewal.");
-                }
-            }
-        } catch (Exception e) {
-            LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
-                    "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials, topologyConf);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected byte[] getHadoopCredentials(Map conf) {
-        try {
-            if(UserGroupInformation.isSecurityEnabled()) {
-                final Configuration configuration = new Configuration();
-
-                login(configuration);
-
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-
-                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
-                        : FileSystem.getDefaultUri(configuration);
-
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-
-                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
-                    @Override
-                    public Object run() {
-                        try {
-                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
-                            Credentials credential= proxyUser.getCredentials();
-
-                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
-                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
-                            return credential;
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-
-
-                ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                ObjectOutputStream out = new ObjectOutputStream(bao);
-
-                creds.write(out);
-                out.flush();
-                out.close();
-
-                return bao.toByteArray();
-            } else {
-                throw new RuntimeException("Security is not enabled for HDFS");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    private void login(Configuration configuration) throws IOException {
-        configuration.set(STORM_KEYTAB_FILE_KEY, this.hdfsKeyTab);
-        configuration.set(STORM_USER_NAME_KEY, this.hdfsPrincipal);
-        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
-
-        LOG.info("Logged into hdfs with principal {}", this.hdfsPrincipal);
-    }
-
-    protected String getCredentialKey() {
-        return HDFS_CREDENTIALS;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
-        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
-
-        Configuration configuration = new Configuration();
-        AutoHDFS autoHDFS = new AutoHDFS();
-        autoHDFS.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
-        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
-
-        Subject s = new Subject();
-        autoHDFS.populateSubject(s, creds);
-        LOG.info("Got a Subject "+ s);
-
-        autoHDFS.renew(creds, conf);
-        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
deleted file mode 100644
index f380b38..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hdfs.common.security;
-
-import org.apache.storm.security.auth.kerberos.AutoTGT;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
-/**
- * This class provides util methods for storm-hdfs connector communicating
- * with secured HDFS.
- */
-public class HdfsSecurityUtil {
-    public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
-    public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
-
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
-    private static AtomicBoolean isLoggedIn = new AtomicBoolean();
-    public static void login(Map conf, Configuration hdfsConfig) throws IOException {
-        //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
-        if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
-                (!(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName())) &&
-                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoTGT.class.getName())))) {
-            if (UserGroupInformation.isSecurityEnabled()) {
-                // compareAndSet added because of https://issues.apache.org/jira/browse/STORM-1535
-                if (isLoggedIn.compareAndSet(false, true)) {
-                    LOG.info("Logging in using keytab as AutoHDFS is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
-                    String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-                    if (keytab != null) {
-                        hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
-                    }
-                    String userName = (String) conf.get(STORM_USER_NAME_KEY);
-                    if (userName != null) {
-                        hdfsConfig.set(STORM_USER_NAME_KEY, userName);
-                    }
-                    SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index fe72610..b956326 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.storm.hdfs.common.HdfsUtils;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a0122aed/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 43993c9..a863643 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.storm.Config;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.apache.storm.hdfs.security.HdfsSecurityUtil;
 import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;


[5/6] storm git commit: Merge branch 'STORM-2482-master' of https://github.com/arunmahadevan/storm into STORM-2482-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2482-master' of https://github.com/arunmahadevan/storm into STORM-2482-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f342d5c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f342d5c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f342d5c3

Branch: refs/heads/master
Commit: f342d5c3de859e49f7244f68186a15445495898f
Parents: 5b270d3 56310f9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 8 13:53:28 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 8 13:53:28 2017 +0900

----------------------------------------------------------------------
 docs/SECURITY.md                                |  18 +-
 docs/storm-hbase.md                             |  37 ++-
 docs/storm-hdfs.md                              |  43 ++-
 external/storm-autocreds/pom.xml                | 103 +++++++
 .../apache/storm/common/AbstractAutoCreds.java  | 248 ++++++++++++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 179 ++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  73 +++++
 .../apache/storm/hdfs/security/AutoHDFS.java    | 212 ++++++++++++++
 .../storm/hdfs/security/HdfsSecurityUtil.java   |  68 +++++
 external/storm-hbase/pom.xml                    |   6 +-
 .../apache/storm/hbase/security/AutoHBase.java  | 243 ----------------
 .../storm/hbase/security/HBaseSecurityUtil.java |  72 -----
 external/storm-hdfs/pom.xml                     |   5 +
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   2 +-
 .../storm/hdfs/common/security/AutoHDFS.java    | 281 -------------------
 .../hdfs/common/security/HdfsSecurityUtil.java  |  67 -----
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |   2 +-
 .../apache/storm/hdfs/trident/HdfsState.java    |   2 +-
 pom.xml                                         |   2 +
 .../final-package/src/main/assembly/binary.xml  |   9 +-
 20 files changed, 977 insertions(+), 695 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f342d5c3/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-hbase/pom.xml
index d5294ec,4e763ab..9506918
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@@ -94,17 -93,10 +93,22 @@@
              <artifactId>caffeine</artifactId>
              <version>${caffeine.version}</version>
          </dependency>
+         <dependency>
+             <groupId>org.apache.storm</groupId>
+             <artifactId>storm-autocreds</artifactId>
+             <version>${project.version}</version>
+         </dependency>
      </dependencies>
 +    <build>
 +        <plugins>
 +            <plugin>
 +                <groupId>org.apache.maven.plugins</groupId>
 +                <artifactId>maven-checkstyle-plugin</artifactId>
 +                <!--Note - the version would be inherited-->
 +                <configuration>
 +                    <maxAllowedViolations>1572</maxAllowedViolations>
 +                </configuration>
 +            </plugin>
 +        </plugins>
 +    </build>
  </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/f342d5c3/external/storm-hdfs/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/f342d5c3/pom.xml
----------------------------------------------------------------------