You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/06 23:44:11 UTC

[01/11] storm git commit: making storm-hdfs bolt/states work with delegation tokens.

Repository: storm
Updated Branches:
  refs/heads/master c092cc5cd -> e71e2a3f7


making storm-hdfs bolt/states work with delegation tokens.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fefaf9bb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fefaf9bb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fefaf9bb

Branch: refs/heads/master
Commit: fefaf9bb1cab8890c19b538d8ea74c80ab30a853
Parents: 5a67957
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 17:50:32 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:15:38 2014 -0800

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |  5 +-
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    | 18 +------
 .../storm/hdfs/common/security/AutoHDFS.java    | 31 +++++++----
 .../hdfs/common/security/HdfsSecurityUtil.java  | 54 +++++++++++++++++---
 .../apache/storm/hdfs/trident/HdfsState.java    | 23 +--------
 5 files changed, 71 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fefaf9bb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 4539dcf..ac4f7df 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -103,10 +103,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
 
 
         try{
-            //if AutoHDFS is specified, do not attempt login.
-            if(!AutoHDFS.class.getName().equals(conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS))) {
-                HdfsSecurityUtil.login(conf, hdfsConfig);
-            }
+            HdfsSecurityUtil.login(conf, hdfsConfig);
             doPrepare(conf, topologyContext, collector);
             this.currentFile = createOutputFile();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fefaf9bb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 80944f6..7e2369d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -93,23 +93,7 @@ public class HdfsBolt extends AbstractHdfsBolt{
     @Override
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing HDFS Bolt...");
-        AccessControlContext context = AccessController.getContext();
-        Subject subject = Subject.getSubject(context);
-
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            UserGroupInformation.getCurrentUser().addToken(token);
-                            LOG.info("Added delegation tokens to UGI.");
-                        }
-                    }
-                }
-            }
-        }
+
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fefaf9bb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
index 5a2c1e9..ad9214e 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -35,6 +35,7 @@ import javax.security.auth.Subject;
 import javax.xml.bind.DatatypeConverter;
 import java.io.*;
 import java.net.URI;
+import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -46,6 +47,7 @@ import java.util.Map;
 public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
     public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
+    public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
 
     @Override
     public void prepare(Map conf) {
@@ -61,6 +63,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     public void populateCredentials(Map<String, String> credentials, Map conf) {
         try {
             credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+            LOG.info("HDFS tokens added to credentials map.");
         } catch (Exception e) {
             LOG.warn("Could not populate HDFS credentials.", e);
         }
@@ -166,29 +169,39 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
 
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
-                Configuration configuration = new Configuration();
+                final Configuration configuration = new Configuration();
                 HdfsSecurityUtil.login(conf, configuration);
 
                 final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
                 final String hdfsUser = (String) conf.get(HdfsSecurityUtil.STORM_USER_NAME_KEY);
 
-                URI nameNodeURI = conf.containsKey(Config.TOPOLOGY_HDFS_URI) ? new URI(conf.get(Config.TOPOLOGY_HDFS_URI).toString())
+                final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
 
-                FileSystem fileSystem =  FileSystem.get(nameNodeURI, configuration);
-
                 UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-                UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                Credentials creds = (Credentials) proxyUser.doAs(new PrivilegedAction<Object>() {
+                    @Override
+                    public Object run() {
+                        try {
+                            FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
+                            Credentials credential= proxyUser.getCredentials();
 
-                Credentials credential= proxyUser.getCredentials();
+                            fileSystem.addDelegationTokens(hdfsUser, credential);
+                            return credential;
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
 
-                fileSystem.addDelegationTokens(hdfsUser, credential);
 
                 ByteArrayOutputStream bao = new ByteArrayOutputStream();
                 ObjectOutputStream out = new ObjectOutputStream(bao);
 
-                credential.write(out);
+                creds.write(out);
                 out.flush();
                 out.close();
 
@@ -208,7 +221,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @SuppressWarnings("unchecked")
     public static void main(String[] args) throws Exception {
         Map conf = new HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM or storm/node.exmaple.com@WITZEND.COM
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
         conf.put(HdfsSecurityUtil.STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
         conf.put(HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fefaf9bb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
index 9540564..d8b7f5d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
@@ -17,12 +17,24 @@
  */
 package org.apache.storm.hdfs.common.security;
 
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class provides util methods for storm-hdfs connector communicating
@@ -32,17 +44,43 @@ public class HdfsSecurityUtil {
     public static final String STORM_KEYTAB_FILE_KEY = "hdfs.keytab.file";
     public static final String STORM_USER_NAME_KEY = "hdfs.kerberos.principal";
 
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
+
     public static void login(Map conf, Configuration hdfsConfig) throws IOException {
-        if (UserGroupInformation.isSecurityEnabled()) {
-            String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            if (keytab != null) {
-              hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+        //Add all tokens
+        AccessControlContext context = AccessController.getContext();
+        Subject subject = Subject.getSubject(context);
+
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            UserGroupInformation.getCurrentUser().addToken(token);
+                            LOG.info("Added delegation tokens to UGI.");
+                        }
+                    }
+                }
             }
-            String userName = (String) conf.get(STORM_USER_NAME_KEY);
-            if (userName != null) {
-              hdfsConfig.set(STORM_USER_NAME_KEY, userName);
+        }
+
+        //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
+        if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName()))) {
+            if (UserGroupInformation.isSecurityEnabled()) {
+                LOG.info("Logging in using keytab as AutoHDFS is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+                String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                if (keytab != null) {
+                    hdfsConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                }
+                String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                if (userName != null) {
+                    hdfsConfig.set(STORM_USER_NAME_KEY, userName);
+                }
+                SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
             }
-            SecurityUtil.login(hdfsConfig, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fefaf9bb/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 7763485..511d2b2 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -113,13 +113,9 @@ public class HdfsState implements State {
                 }
             }
             try{
-                //if AutoHDFS is specified, do not attempt login.
-                if(!AutoHDFS.class.getName().equals(conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS))) {
-                    HdfsSecurityUtil.login(conf, hdfsConfig);
-                }
+                HdfsSecurityUtil.login(conf, hdfsConfig);
                 doPrepare(conf, partitionIndex, numPartitions);
                 this.currentFile = createOutputFile();
-
             } catch (Exception e){
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
             }
@@ -182,23 +178,6 @@ public class HdfsState implements State {
         @Override
         void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing HDFS Bolt...");
-            AccessControlContext context = AccessController.getContext();
-            Subject subject = Subject.getSubject(context);
-
-            if(subject != null) {
-                Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-                if (privateCredentials != null) {
-                    for (Credentials cred : privateCredentials) {
-                        Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                        if (allTokens != null) {
-                            for (Token<? extends TokenIdentifier> token : allTokens) {
-                                UserGroupInformation.getCurrentUser().addToken(token);
-                                LOG.info("Added delegation tokens to UGI.");
-                            }
-                        }
-                    }
-                }
-            }
 
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }


[02/11] storm git commit: Moving AutoHDFS to storm-hdfs. Added code to leverage delegation tokens in storm-hdfs bolts/states.

Posted by bo...@apache.org.
Moving AutoHDFS to storm-hdfs. Added code to leverage delegation tokens in storm-hdfs bolts/states.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5a67957e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5a67957e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5a67957e

Branch: refs/heads/master
Commit: 5a67957efd3c874cd552e991d27eb9a8021129b3
Parents: 65e9f0c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 3 14:41:24 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:15:38 2014 -0800

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   7 +-
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |  27 ++
 .../storm/hdfs/common/security/AutoHDFS.java    | 231 ++++++++++++++++
 .../apache/storm/hdfs/trident/HdfsState.java    |  32 ++-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   4 +-
 .../storm/security/auth/hadoop/AutoHDFS.java    | 262 -------------------
 6 files changed, 297 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index f260598..4539dcf 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -29,6 +30,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.AutoHDFS;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,7 +103,10 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
 
 
         try{
-            HdfsSecurityUtil.login(conf, hdfsConfig);
+            //if AutoHDFS is specified, do not attempt login.
+            if(!AutoHDFS.class.getName().equals(conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS))) {
+                HdfsSecurityUtil.login(conf, hdfsConfig);
+            }
             doPrepare(conf, topologyContext, collector);
             this.currentFile = createOutputFile();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index a416357..80944f6 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.format.RecordFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -33,10 +37,16 @@ import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.IOException;
 import java.net.URI;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.Principal;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.Set;
 
 public class HdfsBolt extends AbstractHdfsBolt{
     private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class);
@@ -83,6 +93,23 @@ public class HdfsBolt extends AbstractHdfsBolt{
     @Override
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing HDFS Bolt...");
+        AccessControlContext context = AccessController.getContext();
+        Subject subject = Subject.getSubject(context);
+
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            UserGroupInformation.getCurrentUser().addToken(token);
+                            LOG.info("Added delegation tokens to UGI.");
+                        }
+                    }
+                }
+            }
+        }
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
new file mode 100644
index 0000000..5a2c1e9
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.common.security;
+
+import backtype.storm.Config;
+import backtype.storm.security.INimbusCredentialPlugin;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.*;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Automatically get HDFS delegation tokens and push it to user's topology. The class
+ * assumes that HDFS configuration files are in your class path.
+ */
+public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
+    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
+
+    @Override
+    public void prepare(Map conf) {
+        //no op.
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map conf) {
+        try {
+            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+        } catch (Exception e) {
+            LOG.warn("Could not populate HDFS credentials.", e);
+        }
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        //no op.
+    }
+
+    /*
+ *
+ * @param credentials map with creds.
+ * @return instance of org.apache.hadoop.security.Credentials.
+ * this class's populateCredentials must have been called before.
+ */
+    @SuppressWarnings("unchecked")
+    protected Credentials getCredentials(Map<String, String> credentials) {
+        Credentials credential = null;
+        if (credentials != null && credentials.containsKey(getCredentialKey())) {
+            try {
+                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
+                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+                credential = new Credentials();
+                credential.readFields(in);
+            } catch (Exception e) {
+                LOG.warn("Could not obtain credentials from credentials map.", e);
+            }
+        }
+        return credential;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            Credentials credential = getCredentials(credentials);
+            if (credential != null) {
+                subject.getPrivateCredentials().add(credential);
+                LOG.info("HDFS Credentials added to the subject.");
+            } else {
+                LOG.info("No credential found in credentials");
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        Credentials credential = getCredentials(credentials);
+        //maximum allowed expiration time until which tokens will keep renewing,
+        //currently set to 1 day.
+        final long MAX_ALLOWED_EXPIRATION_MILLIS = 24 * 60 * 60 * 1000;
+
+        try {
+            if (credential != null) {
+                Configuration configuration = new Configuration();
+                Collection<Token<? extends TokenIdentifier>> tokens = credential.getAllTokens();
+
+                if(tokens != null && tokens.isEmpty() == false) {
+                    for (Token token : tokens) {
+                        long expiration = (Long) token.renew(configuration);
+                        if (expiration < MAX_ALLOWED_EXPIRATION_MILLIS) {
+                            LOG.debug("expiration {} is less then MAX_ALLOWED_EXPIRATION_MILLIS {}, getting new tokens",
+                                    expiration, MAX_ALLOWED_EXPIRATION_MILLIS);
+                            populateCredentials(credentials, topologyConf);
+                        }
+                    }
+                } else {
+                    LOG.debug("No tokens found for credentials, skipping renewal.");
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
+                    "renewal period so attempting to get new tokens.", e);
+            populateCredentials(credentials);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf) {
+
+        try {
+            if(UserGroupInformation.isSecurityEnabled()) {
+                Configuration configuration = new Configuration();
+                HdfsSecurityUtil.login(conf, configuration);
+
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+                final String hdfsUser = (String) conf.get(HdfsSecurityUtil.STORM_USER_NAME_KEY);
+
+                URI nameNodeURI = conf.containsKey(Config.TOPOLOGY_HDFS_URI) ? new URI(conf.get(Config.TOPOLOGY_HDFS_URI).toString())
+                        : FileSystem.getDefaultUri(configuration);
+
+                FileSystem fileSystem =  FileSystem.get(nameNodeURI, configuration);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                Credentials credential= proxyUser.getCredentials();
+
+                fileSystem.addDelegationTokens(hdfsUser, credential);
+
+                ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(bao);
+
+                credential.write(out);
+                out.flush();
+                out.close();
+
+                return bao.toByteArray();
+            } else {
+                throw new RuntimeException("Security is not enabled for HDFS");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    protected String getCredentialKey() {
+        return HDFS_CREDENTIALS;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM or storm/node.exmaple.com@WITZEND.COM
+        conf.put(HdfsSecurityUtil.STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+        conf.put(HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+
+        Configuration configuration = new Configuration();
+        AutoHDFS autoHDFS = new AutoHDFS();
+        autoHDFS.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHDFS.populateCredentials(creds, conf);
+        LOG.info("Got HDFS credentials", autoHDFS.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHDFS.populateSubject(s, creds);
+        LOG.info("Got a Subject "+ s);
+
+        autoHDFS.renew(creds, conf);
+        LOG.info("renewed credentials", autoHDFS.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 67fff88..7763485 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hdfs.trident;
 
+import backtype.storm.Config;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.AutoHDFS;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
@@ -40,9 +46,12 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
+import javax.security.auth.Subject;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.util.*;
 
 public class HdfsState implements State {
@@ -104,7 +113,10 @@ public class HdfsState implements State {
                 }
             }
             try{
-                HdfsSecurityUtil.login(conf, hdfsConfig);
+                //if AutoHDFS is specified, do not attempt login.
+                if(!AutoHDFS.class.getName().equals(conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS))) {
+                    HdfsSecurityUtil.login(conf, hdfsConfig);
+                }
                 doPrepare(conf, partitionIndex, numPartitions);
                 this.currentFile = createOutputFile();
 
@@ -170,6 +182,24 @@ public class HdfsState implements State {
         @Override
         void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing HDFS Bolt...");
+            AccessControlContext context = AccessController.getContext();
+            Subject subject = Subject.getSubject(context);
+
+            if(subject != null) {
+                Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+                if (privateCredentials != null) {
+                    for (Credentials cred : privateCredentials) {
+                        Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                        if (allTokens != null) {
+                            for (Token<? extends TokenIdentifier> token : allTokens) {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            }
+                        }
+                    }
+                }
+            }
+
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 1fbf7f0..b8a8815 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1053,8 +1053,8 @@
                 topology (normalize-topology total-storm-conf topology)
 
                 storm-cluster-state (:storm-cluster-state nimbus)]
-            (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
-              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
+            (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
+              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))
             (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) 
               (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
             (system-topology! total-storm-conf topology) ;; this validates the structure of the topology

http://git-wip-us.apache.org/repos/asf/storm/blob/5a67957e/storm-core/src/jvm/backtype/storm/security/auth/hadoop/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/hadoop/AutoHDFS.java b/storm-core/src/jvm/backtype/storm/security/auth/hadoop/AutoHDFS.java
deleted file mode 100644
index f61fb25..0000000
--- a/storm-core/src/jvm/backtype/storm/security/auth/hadoop/AutoHDFS.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.security.auth.hadoop;
-
-import backtype.storm.Config;
-import backtype.storm.security.INimbusCredentialPlugin;
-import backtype.storm.security.auth.IAutoCredentials;
-import backtype.storm.security.auth.ICredentialsRenewer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.Subject;
-import javax.xml.bind.DatatypeConverter;
-import java.io.*;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Automatically get HDFS delegation tokens and push it to user's topology. The class
- * assumes that HDFS configuration files are in your class path.
- */
-public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
-    private static final Logger LOG = LoggerFactory.getLogger(AutoHDFS.class);
-    public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
-
-    public void prepare(Map conf) {
-       //no op.
-    }
-
-    @SuppressWarnings("unchecked")
-    private byte[] getHDFSCredsWithDelegationToken(Map conf) throws Exception {
-
-        try {
-             // What we want to do is following:
-             //  if(UserGroupInformation.isSecurityEnabled) {
-             //      FileSystem fs = FileSystem.get(nameNodeURI, configuration, topologySubmitterUser);
-             //      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-             //      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-             //      Credentials credential= proxyUser.getCredentials();
-             //      fs.addDelegationToken(hdfsUser, credential);
-             // }
-             // and then return the credential object as a bytearray.
-             //
-             // Following are the minimum set of configuration that needs to be set,  users should have hdfs-site.xml
-             // and core-site.xml in the class path which should set these configuration.
-             // configuration.set("hadoop.security.authentication", "KERBEROS");
-             // configuration.set("dfs.namenode.kerberos.principal",
-             //                                "hdfs/zookeeper.witzend.com@WITZEND.COM");
-             // configuration.set("hadoop.security.kerberos.ticket.cache.path", "/tmp/krb5cc_1002");
-             // and the ticket cache must have the hdfs user's creds.
-
-            Class configurationClass = Class.forName("org.apache.hadoop.conf.Configuration");
-            Object configuration = configurationClass.newInstance();
-
-            //UserGroupInformation.isSecurityEnabled
-            final Class ugiClass = Class.forName("org.apache.hadoop.security.UserGroupInformation");
-            final Method isSecurityEnabledMethod = ugiClass.getDeclaredMethod("isSecurityEnabled");
-            boolean isSecurityEnabled = (Boolean)isSecurityEnabledMethod.invoke(null);
-
-            if(isSecurityEnabled) {
-                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_USER);
-                final String hdfsUser = (String) conf.get(Config.TOPOLOGY_HDFS_PRINCIPAL);
-
-                //FileSystem fs = FileSystem.get(nameNodeURI, configuration, topologySubmitterUser);
-                Class fileSystemClass = Class.forName("org.apache.hadoop.fs.FileSystem");
-
-                Object nameNodeURI = conf.containsKey(Config.TOPOLOGY_HDFS_URI) ? conf.get(Config.TOPOLOGY_HDFS_URI)
-                        : fileSystemClass.getMethod("getDefaultUri", configurationClass).invoke(null, configuration);
-                Method getMethod = fileSystemClass.getMethod("get", URI.class, configurationClass, String.class);
-                Object fileSystem = getMethod.invoke(null, nameNodeURI, configuration, topologySubmitterUser);
-
-                //UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-                Method getCurrentUserMethod = ugiClass.getMethod("getCurrentUser");
-                final Object ugi = getCurrentUserMethod.invoke(null);
-
-                //UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
-                Method createProxyUserMethod = ugiClass.getMethod("createProxyUser", String.class, ugiClass);
-                Object proxyUGI = createProxyUserMethod.invoke(null, topologySubmitterUser, ugi);
-
-                //Credentials credential= proxyUser.getCredentials();
-                Method getCredentialsMethod = ugiClass.getMethod("getCredentials");
-                Object credentials = getCredentialsMethod.invoke(proxyUGI);
-
-                //fs.addDelegationToken(hdfsUser, credential);
-                Class credentialClass = Class.forName("org.apache.hadoop.security.Credentials");
-                Method addDelegationTokensMethod = fileSystemClass.getMethod("addDelegationTokens", String.class,
-                        credentialClass);
-                addDelegationTokensMethod.invoke(fileSystem, hdfsUser, credentials);
-
-
-                ByteArrayOutputStream bao = new ByteArrayOutputStream();
-                ObjectOutputStream out = new ObjectOutputStream(bao);
-                Method writeMethod = credentialClass.getMethod("write", DataOutput.class);
-                writeMethod.invoke(credentials, out);
-                out.flush();
-                out.close();
-
-                return bao.toByteArray();
-            } else {
-                throw new RuntimeException("Security is not enabled for HDFS");
-            }
-        } catch (Exception ex) {
-            throw new RuntimeException("Failed to get delegation tokens." , ex);
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials, Map conf) {
-        if(conf.containsKey(Config.TOPOLOGY_HDFS_PRINCIPAL)) {
-            try {
-                credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary(getHDFSCredsWithDelegationToken(conf)));
-            } catch (Exception e) {
-                LOG.warn("Could not populate HDFS credentials.", e);
-            }
-        }
-    }
-
-    @Override
-    public void populateCredentials(Map<String, String> credentials) {
-        //no op.
-    }
-
-    /*
-     *
-     * @param credentials map with creds.
-     * @return instance of org.apache.hadoop.security.Credentials, if the Map has HDFS_CREDENTIALS.
-     * this class's populateCredentials must have been called before.
-     */
-    @SuppressWarnings("unchecked")
-    private static Object getHDFSCredential(Map<String, String> credentials) {
-        Object credential = null;
-        if (credentials != null && credentials.containsKey(HDFS_CREDENTIALS)) {
-            try {
-                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(HDFS_CREDENTIALS));
-                ByteArrayInputStream bai = new ByteArrayInputStream(credBytes);
-                ObjectInputStream in = new ObjectInputStream(bai);
-
-                Class credentialClass = Class.forName("org.apache.hadoop.security.Credentials");
-                credential = credentialClass.newInstance();
-                Method readMethod  = credentialClass.getMethod("readFields", DataInput.class);
-                readMethod.invoke(credential, in);
-            } catch (Exception e) {
-                LOG.warn("Could not obtain HDFS credentials from credentials map.", e);
-            }
-        }
-        return credential;
-    }
-
-    @Override
-    public void updateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-    }
-
-    @Override
-    public void populateSubject(Subject subject, Map<String, String> credentials) {
-        addCredentialToSubject(subject, credentials);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
-        try {
-            Object credential = getHDFSCredential(credentials);
-            if (credential != null) {
-                subject.getPrivateCredentials().add(credential);
-            } else {
-                LOG.info("No HDFS credential found in credentials");
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
-        }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void renew(Map<String, String> credentials, Map topologyConf) {
-        Object credential = getHDFSCredential(credentials);
-        //maximum allowed expiration time until which tokens will keep renewing,
-        //currently set to 1 day.
-        final long MAX_ALLOWED_EXPIRATION_MILLIS = 24 * 60 * 60 * 1000;
-
-        /**
-         * We are trying to do the following :
-         * List<Token> tokens = credential.getAllTokens();
-         * for(Token token: tokens) {
-         *      long expiration = token.renew(configuration);
-         * }
-         */
-        try {
-            if (credential != null) {
-                Class configurationClass = Class.forName("org.apache.hadoop.conf.Configuration");
-                Object configuration = configurationClass.newInstance();
-
-                Class credentialClass = Class.forName("org.apache.hadoop.security.Credentials");
-                Class tokenClass = Class.forName("org.apache.hadoop.security.token.Token");
-
-                Method renewMethod = tokenClass.getMethod("renew", configurationClass);
-                Method getAllTokensMethod = credentialClass.getMethod("getAllTokens");
-
-                Collection<?> tokens = (Collection<?>) getAllTokensMethod.invoke(credential);
-
-                for (Object token : tokens) {
-                    long expiration = (Long) renewMethod.invoke(token, configuration);
-                    if(expiration < MAX_ALLOWED_EXPIRATION_MILLIS) {
-                        LOG.debug("expiration {} is less then MAX_ALLOWED_EXPIRATION_MILLIS {}, getting new tokens",
-                                expiration, MAX_ALLOWED_EXPIRATION_MILLIS);
-                        populateCredentials(credentials, topologyConf);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
-                    "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void main(String[] args) throws Exception {
-        Map conf = new java.util.HashMap();
-        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(Config.TOPOLOGY_HDFS_PRINCIPAL, args[1]); //with realm e.g. hdfs@WITZEND.COM
-
-        AutoHDFS autoHDFS = new AutoHDFS();
-        autoHDFS.prepare(conf);
-
-        Map<String,String> creds  = new HashMap<String, String>();
-        autoHDFS.populateCredentials(creds, conf);
-        LOG.info("Got HDFS credentials", AutoHDFS.getHDFSCredential(creds));
-
-        Subject s = new Subject();
-        autoHDFS.populateSubject(s, creds);
-        LOG.info("Got a Subject "+ s);
-
-        autoHDFS.renew(creds, conf);
-        LOG.info("renewed credentials", AutoHDFS.getHDFSCredential(creds));
-    }
-
-    @Override
-    public void shutdown() {
-
-    }
-}
-


[09/11] storm git commit: Revert " STORM-586: TridentKafkaEmitter should catch updateOffsetException."

Posted by bo...@apache.org.
Revert " STORM-586: TridentKafkaEmitter should catch updateOffsetException."

This reverts commit 65e9f0c814b2cddc772880042259b66194fd6fb7.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b272f35b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b272f35b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b272f35b

Branch: refs/heads/master
Commit: b272f35bed7688427b2bb88904f632ff7cd01d82
Parents: 5c69677
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 12 10:43:06 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:43:06 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 +++----
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 +----
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +---------
 3 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
+                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]";
-                LOG.warn(msg);
-                throw new UpdateOffsetException(msg);
+                        "configured start offset time: [" + config.startOffsetTime + "]");
+                throw new UpdateOffsetException();
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 5c366ec..1be7312 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,9 +17,6 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends FailedFetchException {
+public class UpdateOffsetException extends RuntimeException {
 
-    public UpdateOffsetException(String message) {
-        super(message);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,7 +33,6 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -130,14 +129,7 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
-            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
-        }
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);


[04/11] storm git commit: adding documentation for AutoHbase and AutoHDFS usage.

Posted by bo...@apache.org.
adding documentation for AutoHbase and AutoHDFS usage.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0de45c2e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0de45c2e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0de45c2e

Branch: refs/heads/master
Commit: 0de45c2e5badf211d5e9149801862c8cae8ec36c
Parents: a37def8
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 20:44:58 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:18:18 2014 -0800

----------------------------------------------------------------------
 SECURITY.md                    | 21 +++++++----------
 external/storm-hbase/README.md | 46 ++++++++++++++++++++++++++++++++++++-
 external/storm-hdfs/README.md  | 43 ++++++++++++++++++++++++++++++++++
 3 files changed, 96 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0de45c2e/SECURITY.md
----------------------------------------------------------------------
diff --git a/SECURITY.md b/SECURITY.md
index a5cd264..40ea50e 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -330,22 +330,17 @@ This config file also needs to be owned by root and not have world or group writ
 ### Automatic Credentials Push and Renewal
 Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services.  Exposing this to all of the users can be a pain for them.
 To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed.
-These are controlled by the following configs. topology.auto-credentials is a list of java plugins that populate the credentials and unpack them on the worker side.
-On a kerberos secure cluster they should be set by default to point to backtype.storm.security.auth.kerberos.AutoTGT.  nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user.
+These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway 
+and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to backtype.storm.security.auth.kerberos.AutoTGT.  
+nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user.
 
 nimbus.credential.renewers.freq.secs controls how often the renewer will poll to see if anything needs to be renewed, but the default should be fine.
 
-#### Automatic HDFS credential push and renewal
-If your topology is going to use secure HDFS , your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus need to start with 
-nimbus.autocredential.plugins.classes=backtype.storm.security.auth.hadoop.AutoHDFS and nimbus.credential.renewers.classes=backtype.storm.security.auth.hadoop.AutoHDFS. Your topology configuration
-should  have topology.auto-credentials=backtype.storm.security.auth.hadoop.AutoHDFS so workers can automatically get the credentials in the Subject.
-
-If nimbus did not have the above configuration you need to add it and then restart it. Ensure all the hadoop configuration files are present in the nimbus' classpath. Please read more about setting up
-secure hadoop on http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
-
-You also need to ensure that nimbus user is allowed to act as a super user and get delegation tokens on behalf of other users. To achieve this you need to follow configuration directions listed on this link
-http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html.
-
+In addition Nimbus itself can be used to get credentials on behalf of the user submitting topologies. This can be configures using nimbus.autocredential.plugins.classes which is a list 
+of fully qualified class names ,all of which must implement INimbusCredentialPlugin.  Nimbus will invoke the populateCredentials method of all the configured implementation as part of topology
+submission. You should use this config with topology.auto-credentials and nimbus.credential.renewers.classes so the credentials can be populated on worker side and nimbus can automatically renew
+them. Currently there are 2 examples of using this config, AutoHDFS and AutoHBase which auto populates hdfs and hbase delegation tokens for topology submitter so they don't have to disrtibute keytabs
+on all possible worker hosts.
 
 ### Limits
 By default storm allows any sized topology to be submitted. But ZK and others have limitations on how big a topology can actually be.  The following configs allow you to limit the maximum size a topology can be.

http://git-wip-us.apache.org/repos/asf/storm/blob/0de45c2e/external/storm-hbase/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hbase/README.md b/external/storm-hbase/README.md
index a5f252d..61d67f3 100644
--- a/external/storm-hbase/README.md
+++ b/external/storm-hbase/README.md
@@ -34,6 +34,50 @@ ColumnList cols = new ColumnList();
 cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
 ```
 
+When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be
+provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain
+{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}. Example:
+
+```java
+Config config = new Config();
+...
+config.put("storm.keytab.file", "$keytab");
+config.put("storm.kerberos.principal", "$principle");
+StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());
+```
+
+##Working with Secure HBASE using delegation tokens.
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase Master. 
+The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have 
+multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
+it to all workers. Instead of doing that you could use the following approach:
+
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+storm.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+storm.kerberos.principal: "superuser@EXAMPLE.com"
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
+files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase master node. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
+delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+
+http://hbase.apache.org/book/security.html#security.rest.gateway
+
+You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html.
+
 ### SimpleHBaseMapper
 `storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm
 tuples to both regular HBase columns as well as counter columns.
@@ -212,4 +256,4 @@ under the License.
 
  * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
  * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- 
\ No newline at end of file
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/0de45c2e/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index be7b339..47e6db5 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -317,6 +317,49 @@ that of the bolts.
                 .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
 ```
 
+##Working with Secure HDFS
+If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 
+currently have 2 options to support this:
+
+### Using HDFS delegation tokens 
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : [org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
+hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
+specified in hadoop's core-site.xml)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration 
+files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the
+delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using 
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
+
+You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. Your
+topology configuration should have:
+
+hdfs.keytab.file: "/path/to/keytab/"
+hdfs.kerberos.principal: "user@EXAMPLE.com"
+
+On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with 
+Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need
+to remember this as you bring up new hosts in the cluster.
 
 ## License
 


[08/11] storm git commit: STORM-444 removing unused imports and unintended changes.

Posted by bo...@apache.org.
STORM-444 removing unused imports and unintended changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c696778
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c696778
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c696778

Branch: refs/heads/master
Commit: 5c696778c1c70e6cece4e09ee6434e2b6f96e16f
Parents: 88a8658
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 12 10:41:22 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:41:22 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java    | 2 --
 .../src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java        | 1 -
 .../src/main/java/org/apache/storm/hdfs/trident/HdfsState.java    | 3 ++-
 3 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5c696778/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index ac4f7df..f260598 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.hdfs.bolt;
 
-import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -30,7 +29,6 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.AutoHDFS;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/storm/blob/5c696778/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 006ec25..a416357 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -83,7 +83,6 @@ public class HdfsBolt extends AbstractHdfsBolt{
     @Override
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing HDFS Bolt...");
-
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5c696778/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index cde0e56..67fff88 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -32,6 +32,7 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +107,7 @@ public class HdfsState implements State {
                 HdfsSecurityUtil.login(conf, hdfsConfig);
                 doPrepare(conf, partitionIndex, numPartitions);
                 this.currentFile = createOutputFile();
+
             } catch (Exception e){
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
             }
@@ -168,7 +170,6 @@ public class HdfsState implements State {
         @Override
         void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing HDFS Bolt...");
-
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }
 


[10/11] storm git commit: Merge branch 'STORM-444' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-444

Posted by bo...@apache.org.
Merge branch 'STORM-444' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-444

STORM-444: Add AutoHDFS like credential fetching for HBase

Conflicts:
	external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
	external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
	external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b26e961
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b26e961
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b26e961

Branch: refs/heads/master
Commit: 5b26e9618942ae10b3f47312c71c4a9c9f916c21
Parents: c092cc5 b272f35
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jan 6 16:42:11 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jan 6 16:42:11 2015 -0600

----------------------------------------------------------------------
 SECURITY.md                                     |  21 +-
 external/storm-hbase/README.md                  |  49 +++-
 external/storm-hbase/pom.xml                    |  17 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |   9 +-
 .../apache/storm/hbase/security/AutoHBase.java  | 243 ++++++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  32 ++-
 .../storm/hbase/trident/state/HBaseState.java   |   9 +-
 external/storm-hdfs/README.md                   |  45 +++
 .../storm/hdfs/common/security/AutoHDFS.java    | 281 +++++++++++++++++++
 .../hdfs/common/security/HdfsSecurityUtil.java  |  30 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  16 --
 .../storm/security/auth/hadoop/AutoHDFS.java    | 262 -----------------
 12 files changed, 700 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5b26e961/SECURITY.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/5b26e961/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------


[03/11] storm git commit: Moving AutoHDFS andAutoHBase to storm-hdfs and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with delegation tokens.

Posted by bo...@apache.org.
Moving AutoHDFS andAutoHBase to storm-hdfs and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with delegation tokens.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a37def89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a37def89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a37def89

Branch: refs/heads/master
Commit: a37def89645f62a5df88ac6b46de3743703bb051
Parents: fefaf9b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 17:51:48 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:16:44 2014 -0800

----------------------------------------------------------------------
 external/storm-hbase/pom.xml                    |  17 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |   9 +-
 .../apache/storm/hbase/security/AutoHBase.java  | 214 +++++++++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  58 ++++-
 .../storm/hbase/trident/state/HBaseState.java   |   9 +-
 5 files changed, 294 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index fb9aaf1..b2f8f44 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -36,7 +36,7 @@
     </developers>
 
     <properties>
-        <hbase.version>0.98.1-hadoop2</hbase.version>
+        <hbase.version>0.98.4-hadoop2</hbase.version>
         <hdfs.version>2.2.0</hdfs.version>
     </properties>
 
@@ -49,6 +49,21 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${hbase.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 5f6621b..d814117 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hbase.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
@@ -28,6 +29,7 @@ import org.apache.storm.hbase.common.HBaseClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 // TODO support more configuration options, for now we're defaulting to the hbase-*.xml files found on the classpath
@@ -57,6 +59,7 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
         if(conf == null) {
             throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
         }
+
         if(conf.get("hbase.rootdir") == null) {
             LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
         }
@@ -64,6 +67,10 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
             hbConfig.set(key, String.valueOf(conf.get(key)));
         }
 
-        this.hBaseClient = new HBaseClient(conf, hbConfig, tableName);
+        //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+        //the conf instance is instance of persistentMap so making a copy.
+        Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+        hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+        this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
new file mode 100644
index 0000000..85f7683
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import backtype.storm.Config;
+import backtype.storm.security.INimbusCredentialPlugin;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Automatically get hbase delegation tokens and push it to user's topology. The class
+ * assumes that hadoop/hbase configuration files are in your class path.
+ */
+public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
+
+    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+    @Override
+    public void prepare(Map conf) {
+        //no op.
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map conf) {
+        try {
+            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+        } catch (Exception e) {
+            LOG.warn("Could not populate HBase credentials.", e);
+        }
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        //no op.
+    }
+
+    /*
+ *
+ * @param credentials map with creds.
+ * @return instance of org.apache.hadoop.security.Credentials.
+ * this class's populateCredentials must have been called before.
+ */
+    @SuppressWarnings("unchecked")
+    protected Object getCredentials(Map<String, String> credentials) {
+        Credentials credential = null;
+        if (credentials != null && credentials.containsKey(getCredentialKey())) {
+            try {
+                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
+                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+                credential = new Credentials();
+                credential.readFields(in);
+                LOG.info("Got hbase credentials from credentials Map.");
+            } catch (Exception e) {
+                LOG.warn("Could not obtain credentials from credentials map.", e);
+            }
+        }
+        return credential;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            Object credential = getCredentials(credentials);
+            if (credential != null) {
+                subject.getPrivateCredentials().add(credential);
+                LOG.info("Hbase credentials added to subject.");
+            } else {
+                LOG.info("No credential found in credentials map.");
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf) {
+        try {
+            final Configuration hbaseConf = HBaseConfiguration.create();
+            if(UserGroupInformation.isSecurityEnabled()) {
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                UserProvider provider = UserProvider.instantiate(hbaseConf);
+
+                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, conf.get(HBASE_KEYTAB_FILE_KEY).toString());
+                hbaseConf.set(HBASE_PRINCIPAL_KEY, conf.get(HBASE_PRINCIPAL_KEY).toString());
+                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+                UserGroupInformation.setConfiguration(hbaseConf);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                User user = User.create(ugi);
+
+                if(user.isHBaseSecurityEnabled(hbaseConf)) {
+                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+                    LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+                    Credentials credential= proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } else {
+                    throw new RuntimeException("Security is not enabled for HBase.");
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        //HBASE tokens are not renewable so we always have to get new ones.
+        populateCredentials(credentials, topologyConf);
+    }
+
+    protected String getCredentialKey() {
+        return HBASE_CREDENTIALS;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
+        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+        AutoHBase autoHBase = new AutoHBase();
+        autoHBase.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHBase.populateCredentials(creds, conf);
+        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHBase.populateSubject(s, creds);
+        LOG.info("Got a Subject " + s);
+
+        autoHBase.renew(creds, conf);
+        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index bb53478..d941b66 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -17,35 +17,73 @@
  */
 package org.apache.storm.hbase.security;
 
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class provides util methods for storm-hbase connector communicating
  * with secured HBase.
  */
 public class HBaseSecurityUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
+
     public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
     public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
 
     public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        UserProvider provider = UserProvider.instantiate(hbaseConfig);
-        if (UserGroupInformation.isSecurityEnabled()) {
-            String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            if (keytab != null) {
-                hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+        AccessControlContext context = AccessController.getContext();
+        Subject subject = Subject.getSubject(context);
+
+        if (subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            UserGroupInformation.getCurrentUser().addToken(token);
+                            LOG.info("Added Hbase delegation tokens to UGI.");
+                        }
+                    }
+                }
             }
-            String userName = (String) conf.get(STORM_USER_NAME_KEY);
-            if (userName != null) {
-                hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+        }
+
+        //Allowing keytab based login for backward compatibility.
+        UserProvider provider = UserProvider.instantiate(hbaseConfig);
+        if (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName()))) {
+            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+            if (UserGroupInformation.isSecurityEnabled()) {
+                String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                if (keytab != null) {
+                    hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                }
+                String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                if (userName != null) {
+                    hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+                }
+                provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
+                        InetAddress.getLocalHost().getCanonicalHostName());
             }
-            provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY, 
-                InetAddress.getLocalHost().getCanonicalHostName());
         }
         return provider;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 7b31fad..20ff0d3 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hbase.trident.state;
 
+import backtype.storm.Config;
 import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -109,7 +111,12 @@ public class HBaseState implements State {
             }
         }
 
-        this.hBaseClient = new HBaseClient(conf, hbConfig, options.tableName);
+        //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+        //the conf instance is instance of persistentMap so making a copy.
+        Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+        hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+
+        this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, options.tableName);
     }
 
     @Override


[05/11] storm git commit: moving the token adding to populate subject so on renewal the tokens are automatically added to UGI. Logging into hadoop again before renewal attempt to ensure other hadoop log-ins do not interfere.

Posted by bo...@apache.org.
moving the token adding to populate subject so on renewal the tokens are automatically added to UGI. Logging into hadoop again before renewal attempt to ensure other hadoop log-ins do not interfere.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e590a727
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e590a727
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e590a727

Branch: refs/heads/master
Commit: e590a7272bbb4838cbc2a9acd524e34094af3d27
Parents: b7b20fc
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 11 20:13:20 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:18:22 2014 -0800

----------------------------------------------------------------------
 external/storm-hbase/README.md                  | 11 ++-
 .../apache/storm/hbase/PersistentWordCount.java | 92 ++++++++++++++++++++
 .../apache/storm/hbase/security/AutoHBase.java  | 53 ++++++++---
 .../storm/hbase/security/HBaseSecurityUtil.java | 18 ----
 external/storm-hdfs/README.md                   |  8 +-
 .../storm/hdfs/common/security/AutoHDFS.java    | 81 ++++++++++++-----
 .../hdfs/common/security/HdfsSecurityUtil.java  | 19 ----
 7 files changed, 204 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hbase/README.md b/external/storm-hbase/README.md
index 61d67f3..81d351a 100644
--- a/external/storm-hbase/README.md
+++ b/external/storm-hbase/README.md
@@ -47,7 +47,7 @@ StormSubmitter.submitTopology("$topologyName", config, builder.createTopology())
 ```
 
 ##Working with Secure HBASE using delegation tokens.
-If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase Master. 
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. 
 The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have 
 multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
 it to all workers. Instead of doing that you could use the following approach:
@@ -57,15 +57,18 @@ The nimbus need to start with following configurations:
 
 nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
-storm.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
-storm.kerberos.principal: "superuser@EXAMPLE.com"
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+hbase.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
+if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
+atleast 1 hour less then that.)
 
 Your topology configuration should have:
 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
 
 If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
 files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
-Nimbus will use the keytab and principal specified in the config to authenticate with HBase master node. From then on for every
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
 topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
 topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
 delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
new file mode 100644
index 0000000..94a8ff5
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hbase;
+
+import org.apache.storm.hbase.bolt.HBaseBolt;
+import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
+import org.apache.storm.hbase.security.HBaseSecurityUtil;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.storm.hbase.topology.WordCounter;
+import org.apache.storm.hbase.topology.WordSpout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        Map<String, Object> hbConf = new HashMap<String, Object>();
+        if(args.length > 0){
+            hbConf.put("hbase.rootdir", args[0]);
+        }
+        config.put("hbase.conf", hbConf);
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
+                .withConfigKey("hbase.conf");
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 1) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 2) {
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else if (args.length == 4) {
+            System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + 
+                ", principal name: " + args[3] + ", toplogy name: " + args[1]);
+            hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
+            hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
index 85f7683..02c81bb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -29,19 +29,19 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
 import javax.xml.bind.DatatypeConverter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.*;
 import java.net.InetAddress;
-import java.security.PrivilegedAction;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Automatically get hbase delegation tokens and push it to user's topology. The class
@@ -54,9 +54,15 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
     public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
 
+    public String hbaseKeytab;
+    public String hbasePrincipal;
+
     @Override
     public void prepare(Map conf) {
-        //no op.
+        if(conf.containsKey(HBASE_KEYTAB_FILE_KEY) && conf.containsKey(HBASE_PRINCIPAL_KEY)) {
+            hbaseKeytab = (String) conf.get(HBASE_KEYTAB_FILE_KEY);
+            hbasePrincipal = (String) conf.get(HBASE_PRINCIPAL_KEY);
+        }
     }
 
     @Override
@@ -69,13 +75,13 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
         try {
             credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
         } catch (Exception e) {
-            LOG.warn("Could not populate HBase credentials.", e);
+            LOG.error("Could not populate HBase credentials.", e);
         }
     }
 
     @Override
     public void populateCredentials(Map<String, String> credentials) {
-        //no op.
+        credentials.put(HBASE_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
     /*
@@ -96,7 +102,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
                 credential.readFields(in);
                 LOG.info("Got hbase credentials from credentials Map.");
             } catch (Exception e) {
-                LOG.warn("Could not obtain credentials from credentials map.", e);
+                LOG.error("Could not obtain credentials from credentials map.", e);
             }
         }
         return credential;
@@ -108,6 +114,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     @Override
     public void updateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     /**
@@ -116,6 +123,7 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
     @Override
     public void populateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     @SuppressWarnings("unchecked")
@@ -129,7 +137,28 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
                 LOG.info("No credential found in credentials map.");
             }
         } catch (Exception e) {
-            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    public void addTokensToUGI(Subject subject) {
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
@@ -142,8 +171,8 @@ public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbus
 
                 UserProvider provider = UserProvider.instantiate(hbaseConf);
 
-                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, conf.get(HBASE_KEYTAB_FILE_KEY).toString());
-                hbaseConf.set(HBASE_PRINCIPAL_KEY, conf.get(HBASE_PRINCIPAL_KEY).toString());
+                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, hbaseKeytab);
+                hbaseConf.set(HBASE_PRINCIPAL_KEY, hbasePrincipal);
                 provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
 
                 LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index d941b66..36a4b89 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -49,24 +49,6 @@ public class HBaseSecurityUtil {
     public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
 
     public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        AccessControlContext context = AccessController.getContext();
-        Subject subject = Subject.getSubject(context);
-
-        if (subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            UserGroupInformation.getCurrentUser().addToken(token);
-                            LOG.info("Added Hbase delegation tokens to UGI.");
-                        }
-                    }
-                }
-            }
-        }
-
         //Allowing keytab based login for backward compatibility.
         UserProvider provider = UserProvider.instantiate(hbaseConfig);
         if (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 47e6db5..b37af7e 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -325,10 +325,12 @@ currently have 2 options to support this:
 Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
 The nimbus need to start with following configurations:
 
-nimbus.autocredential.plugins.classes : [org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
 hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
 hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
+less then 24 hours.)
 topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
 specified in hadoop's core-site.xml)
 
@@ -351,8 +353,8 @@ http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superuse
 You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
 
 ### Using keytabs on all worker hosts
-If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. Your
-topology configuration should have:
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a 
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:
 
 hdfs.keytab.file: "/path/to/keytab/"
 hdfs.kerberos.principal: "user@EXAMPLE.com"

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
index ad9214e..faa21ac 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/AutoHDFS.java
@@ -25,6 +25,7 @@ import backtype.storm.security.auth.ICredentialsRenewer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -39,6 +40,10 @@ import java.security.PrivilegedAction;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY;
+import static org.apache.storm.hdfs.common.security.HdfsSecurityUtil.STORM_USER_NAME_KEY;
 
 /**
  * Automatically get HDFS delegation tokens and push it to user's topology. The class
@@ -49,9 +54,15 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     public static final String HDFS_CREDENTIALS = "HDFS_CREDENTIALS";
     public static final String TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
 
+    private String hdfsKeyTab;
+    private String hdfsPrincipal;
+
     @Override
     public void prepare(Map conf) {
-        //no op.
+        if(conf.containsKey(STORM_KEYTAB_FILE_KEY) && conf.containsKey(STORM_USER_NAME_KEY)) {
+            this.hdfsKeyTab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+            this.hdfsPrincipal = (String) conf.get(STORM_USER_NAME_KEY);
+        }
     }
 
     @Override
@@ -65,13 +76,13 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
             credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
             LOG.info("HDFS tokens added to credentials map.");
         } catch (Exception e) {
-            LOG.warn("Could not populate HDFS credentials.", e);
+            LOG.error("Could not populate HDFS credentials.", e);
         }
     }
 
     @Override
     public void populateCredentials(Map<String, String> credentials) {
-        //no op.
+        credentials.put(HDFS_CREDENTIALS, DatatypeConverter.printBase64Binary("dummy place holder".getBytes()));
     }
 
     /*
@@ -91,7 +102,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                 credential = new Credentials();
                 credential.readFields(in);
             } catch (Exception e) {
-                LOG.warn("Could not obtain credentials from credentials map.", e);
+                LOG.error("Could not obtain credentials from credentials map.", e);
             }
         }
         return credential;
@@ -103,6 +114,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     public void updateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     /**
@@ -111,6 +123,7 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     public void populateSubject(Subject subject, Map<String, String> credentials) {
         addCredentialToSubject(subject, credentials);
+        addTokensToUGI(subject);
     }
 
     @SuppressWarnings("unchecked")
@@ -124,7 +137,28 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                 LOG.info("No credential found in credentials");
             }
         } catch (Exception e) {
-            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+            LOG.error("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    public void addTokensToUGI(Subject subject) {
+        if(subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            try {
+                                UserGroupInformation.getCurrentUser().addToken(token);
+                                LOG.info("Added delegation tokens to UGI.");
+                            } catch (IOException e) {
+                                LOG.error("Exception while trying to add tokens to ugi", e);
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
@@ -134,24 +168,19 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     @Override
     @SuppressWarnings("unchecked")
     public void renew(Map<String, String> credentials, Map topologyConf) {
-        Credentials credential = getCredentials(credentials);
-        //maximum allowed expiration time until which tokens will keep renewing,
-        //currently set to 1 day.
-        final long MAX_ALLOWED_EXPIRATION_MILLIS = 24 * 60 * 60 * 1000;
-
         try {
+            Credentials credential = getCredentials(credentials);
             if (credential != null) {
                 Configuration configuration = new Configuration();
                 Collection<Token<? extends TokenIdentifier>> tokens = credential.getAllTokens();
 
                 if(tokens != null && tokens.isEmpty() == false) {
                     for (Token token : tokens) {
+                        //We need to re-login some other thread might have logged into hadoop using
+                        // their credentials (e.g. AutoHBase might be also part of nimbu auto creds)
+                        login(configuration);
                         long expiration = (Long) token.renew(configuration);
-                        if (expiration < MAX_ALLOWED_EXPIRATION_MILLIS) {
-                            LOG.debug("expiration {} is less then MAX_ALLOWED_EXPIRATION_MILLIS {}, getting new tokens",
-                                    expiration, MAX_ALLOWED_EXPIRATION_MILLIS);
-                            populateCredentials(credentials, topologyConf);
-                        }
+                        LOG.info("HDFS delegation token renewed, new expiration time {}", expiration);
                     }
                 } else {
                     LOG.debug("No tokens found for credentials, skipping renewal.");
@@ -160,20 +189,19 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         } catch (Exception e) {
             LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond " +
                     "renewal period so attempting to get new tokens.", e);
-            populateCredentials(credentials);
+            populateCredentials(credentials, topologyConf);
         }
     }
 
     @SuppressWarnings("unchecked")
     protected byte[] getHadoopCredentials(Map conf) {
-
         try {
             if(UserGroupInformation.isSecurityEnabled()) {
                 final Configuration configuration = new Configuration();
-                HdfsSecurityUtil.login(conf, configuration);
+
+                login(configuration);
 
                 final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
-                final String hdfsUser = (String) conf.get(HdfsSecurityUtil.STORM_USER_NAME_KEY);
 
                 final URI nameNodeURI = conf.containsKey(TOPOLOGY_HDFS_URI) ? new URI(conf.get(TOPOLOGY_HDFS_URI).toString())
                         : FileSystem.getDefaultUri(configuration);
@@ -189,7 +217,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
                             FileSystem fileSystem = FileSystem.get(nameNodeURI, configuration);
                             Credentials credential= proxyUser.getCredentials();
 
-                            fileSystem.addDelegationTokens(hdfsUser, credential);
+                            fileSystem.addDelegationTokens(hdfsPrincipal, credential);
+                            LOG.info("Delegation tokens acquired for user {}", topologySubmitterUser);
                             return credential;
                         } catch (IOException e) {
                             throw new RuntimeException(e);
@@ -214,6 +243,14 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
         }
     }
 
+    private void login(Configuration configuration) throws IOException {
+        configuration.set(STORM_KEYTAB_FILE_KEY, this.hdfsKeyTab);
+        configuration.set(STORM_USER_NAME_KEY, this.hdfsPrincipal);
+        SecurityUtil.login(configuration, STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY);
+
+        LOG.info("Logged into hdfs with principal {}", this.hdfsPrincipal);
+    }
+
     protected String getCredentialKey() {
         return HDFS_CREDENTIALS;
     }
@@ -222,8 +259,8 @@ public class AutoHDFS implements IAutoCredentials, ICredentialsRenewer, INimbusC
     public static void main(String[] args) throws Exception {
         Map conf = new HashMap();
         conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
-        conf.put(HdfsSecurityUtil.STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
-        conf.put(HdfsSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
+        conf.put(STORM_USER_NAME_KEY, args[1]); //with realm e.g. hdfs@WITZEND.COM
+        conf.put(STORM_KEYTAB_FILE_KEY, args[2]);// /etc/security/keytabs/storm.keytab
 
         Configuration configuration = new Configuration();
         AutoHDFS autoHDFS = new AutoHDFS();

http://git-wip-us.apache.org/repos/asf/storm/blob/e590a727/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
index d8b7f5d..5847a58 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
@@ -47,25 +47,6 @@ public class HdfsSecurityUtil {
     private static final Logger LOG = LoggerFactory.getLogger(HdfsSecurityUtil.class);
 
     public static void login(Map conf, Configuration hdfsConfig) throws IOException {
-        //Add all tokens
-        AccessControlContext context = AccessController.getContext();
-        Subject subject = Subject.getSubject(context);
-
-        if(subject != null) {
-            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
-            if (privateCredentials != null) {
-                for (Credentials cred : privateCredentials) {
-                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
-                    if (allTokens != null) {
-                        for (Token<? extends TokenIdentifier> token : allTokens) {
-                            UserGroupInformation.getCurrentUser().addToken(token);
-                            LOG.info("Added delegation tokens to UGI.");
-                        }
-                    }
-                }
-            }
-        }
-
         //If AutoHDFS is specified, do not attempt to login using keytabs, only kept for backward compatibility.
         if(conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
                 !(((List)conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHDFS.class.getName()))) {


[07/11] storm git commit: Removing unused imports, deleting accidentaly added file.

Posted by bo...@apache.org.
Removing unused imports, deleting accidentaly added file.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88a8658c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88a8658c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88a8658c

Branch: refs/heads/master
Commit: 88a8658c619b42ac63b3371199f57969aeb5edb0
Parents: e590a72
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 11 20:25:35 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:18:22 2014 -0800

----------------------------------------------------------------------
 .../apache/storm/hbase/PersistentWordCount.java | 92 --------------------
 .../storm/hbase/security/HBaseSecurityUtil.java | 12 +--
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    | 10 ---
 .../hdfs/common/security/HdfsSecurityUtil.java  | 11 +--
 .../apache/storm/hdfs/trident/HdfsState.java    | 10 ---
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  4 +-
 6 files changed, 6 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
deleted file mode 100644
index 94a8ff5..0000000
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/PersistentWordCount.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase;
-
-import org.apache.storm.hbase.bolt.HBaseBolt;
-import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
-import org.apache.storm.hbase.security.HBaseSecurityUtil;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.storm.hbase.topology.WordCounter;
-import org.apache.storm.hbase.topology.WordSpout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class PersistentWordCount {
-    private static final String WORD_SPOUT = "WORD_SPOUT";
-    private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String HBASE_BOLT = "HBASE_BOLT";
-
-
-    public static void main(String[] args) throws Exception {
-        Config config = new Config();
-
-        Map<String, Object> hbConf = new HashMap<String, Object>();
-        if(args.length > 0){
-            hbConf.put("hbase.rootdir", args[0]);
-        }
-        config.put("hbase.conf", hbConf);
-
-        WordSpout spout = new WordSpout();
-        WordCounter bolt = new WordCounter();
-
-        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
-                .withRowKeyField("word")
-                .withColumnFields(new Fields("word"))
-                .withCounterFields(new Fields("count"))
-                .withColumnFamily("cf");
-
-        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
-                .withConfigKey("hbase.conf");
-
-
-        // wordSpout ==> countBolt ==> HBaseBolt
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
-
-        if (args.length == 1) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.createTopology());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 2) {
-            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
-        } else if (args.length == 4) {
-            System.out.println("hdfs url: " + args[0] + ", keytab file: " + args[2] + 
-                ", principal name: " + args[3] + ", toplogy name: " + args[1]);
-            hbConf.put(HBaseSecurityUtil.STORM_KEYTAB_FILE_KEY, args[2]);
-            hbConf.put(HBaseSecurityUtil.STORM_USER_NAME_KEY, args[3]);
-            config.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
-        } else {
-            System.out.println("Usage: PersistentWordCount <hbase.rootdir> [topology name] [keytab file] [principal name]");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index 36a4b89..99dfeba 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -17,26 +17,18 @@
  */
 package org.apache.storm.hbase.security;
 
-import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 
 /**
  * This class provides util methods for storm-hbase connector communicating

http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 7e2369d..006ec25 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -25,10 +25,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.format.RecordFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -37,16 +33,10 @@ import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.IOException;
 import java.net.URI;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.Principal;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.Set;
 
 public class HdfsBolt extends AbstractHdfsBolt{
     private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
index 5847a58..9e390e9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/security/HdfsSecurityUtil.java
@@ -17,24 +17,17 @@
  */
 package org.apache.storm.hdfs.common.security;
 
-import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.IOException;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 
 /**
  * This class provides util methods for storm-hdfs connector communicating

http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 511d2b2..cde0e56 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.hdfs.trident;
 
-import backtype.storm.Config;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.hadoop.conf.Configuration;
@@ -27,18 +26,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.AutoHDFS;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
-
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,12 +39,9 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
-import javax.security.auth.Subject;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.security.AccessControlContext;
-import java.security.AccessController;
 import java.util.*;
 
 public class HdfsState implements State {

http://git-wip-us.apache.org/repos/asf/storm/blob/88a8658c/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index b8a8815..1fbf7f0 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1053,8 +1053,8 @@
                 topology (normalize-topology total-storm-conf topology)
 
                 storm-cluster-state (:storm-cluster-state nimbus)]
-            (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
-              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))
+            (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)]
+              (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
             (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) 
               (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
             (system-topology! total-storm-conf topology) ;; this validates the structure of the topology


[06/11] storm git commit: Removing unused configs.

Posted by bo...@apache.org.
Removing unused configs.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7b20fc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7b20fc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7b20fc8

Branch: refs/heads/master
Commit: b7b20fc84836d5a4b11ba343453a7607900cf595
Parents: 0de45c2
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 20:46:04 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:18:22 2014 -0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java | 16 ----------------
 1 file changed, 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b7b20fc8/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index c680354..aeb6725 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1240,22 +1240,6 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
     public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
 
-    /**
-     * HDFS information, used to get the delegation token on behalf of the topology
-     * submitter user and renew the tokens. see {@link backtype.storm.security.auth.hadoop.AutoHDFS}
-     * kerberos principal name with realm should be provided.
-     */
-    public static final Object TOPOLOGY_HDFS_PRINCIPAL = "topology.hdfs.user";
-    public static final Object TOPOLOGY_HDFS_PRINCIPAL_SCHEMA = String.class;
-
-    /**
-     * The HDFS URI to be used by AutoHDFS.java to grab the delegation token on topology
-     * submitter user's behalf by the nimbus. If this is not provided the default URI provided
-     * in the hdfs configuration files will be used.
-     */
-    public static final Object TOPOLOGY_HDFS_URI = "topology.hdfs.uri";
-    public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class;
-
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }


[11/11] storm git commit: Added STORM-444 to Changelog

Posted by bo...@apache.org.
Added STORM-444 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e71e2a3f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e71e2a3f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e71e2a3f

Branch: refs/heads/master
Commit: e71e2a3f745ee770e20dfd9c878b26f5123877f4
Parents: 5b26e96
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jan 6 16:43:35 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jan 6 16:43:35 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e71e2a3f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1f55539..f15fc27 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -22,6 +22,7 @@
  * STORM-442: multilang ShellBolt/ShellSpout die() can be hang when Exception happened
  * STORM-599: Use use nimbus's cached heartbeats rather than fetching again from ZK
  * STORM-410: Add groups support to log-viewer
+ * STORM-444: Add AutoHDFS like credential fetching for HBase
 
 ## 0.9.3-rc2
  * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor