You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/06 23:44:13 UTC

[03/11] storm git commit: Moving AutoHDFS andAutoHBase to storm-hdfs and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with delegation tokens.

Moving AutoHDFS andAutoHBase to storm-hdfs and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with delegation tokens.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a37def89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a37def89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a37def89

Branch: refs/heads/master
Commit: a37def89645f62a5df88ac6b46de3743703bb051
Parents: fefaf9b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 17:51:48 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:16:44 2014 -0800

----------------------------------------------------------------------
 external/storm-hbase/pom.xml                    |  17 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |   9 +-
 .../apache/storm/hbase/security/AutoHBase.java  | 214 +++++++++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  58 ++++-
 .../storm/hbase/trident/state/HBaseState.java   |   9 +-
 5 files changed, 294 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index fb9aaf1..b2f8f44 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -36,7 +36,7 @@
     </developers>
 
     <properties>
-        <hbase.version>0.98.1-hadoop2</hbase.version>
+        <hbase.version>0.98.4-hadoop2</hbase.version>
         <hdfs.version>2.2.0</hdfs.version>
     </properties>
 
@@ -49,6 +49,21 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${hbase.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 5f6621b..d814117 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hbase.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
@@ -28,6 +29,7 @@ import org.apache.storm.hbase.common.HBaseClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 // TODO support more configuration options, for now we're defaulting to the hbase-*.xml files found on the classpath
@@ -57,6 +59,7 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
         if(conf == null) {
             throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
         }
+
         if(conf.get("hbase.rootdir") == null) {
             LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
         }
@@ -64,6 +67,10 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
             hbConfig.set(key, String.valueOf(conf.get(key)));
         }
 
-        this.hBaseClient = new HBaseClient(conf, hbConfig, tableName);
+        //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+        //the conf instance is instance of persistentMap so making a copy.
+        Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+        hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+        this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
new file mode 100644
index 0000000..85f7683
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import backtype.storm.Config;
+import backtype.storm.security.INimbusCredentialPlugin;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Automatically get hbase delegation tokens and push it to user's topology. The class
+ * assumes that hadoop/hbase configuration files are in your class path.
+ */
+public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
+
+    public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+    public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+    public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+    @Override
+    public void prepare(Map conf) {
+        //no op.
+    }
+
+    @Override
+    public void shutdown() {
+        //no op.
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials, Map conf) {
+        try {
+            credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+        } catch (Exception e) {
+            LOG.warn("Could not populate HBase credentials.", e);
+        }
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        //no op.
+    }
+
+    /*
+ *
+ * @param credentials map with creds.
+ * @return instance of org.apache.hadoop.security.Credentials.
+ * this class's populateCredentials must have been called before.
+ */
+    @SuppressWarnings("unchecked")
+    protected Object getCredentials(Map<String, String> credentials) {
+        Credentials credential = null;
+        if (credentials != null && credentials.containsKey(getCredentialKey())) {
+            try {
+                byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
+                ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+                credential = new Credentials();
+                credential.readFields(in);
+                LOG.info("Got hbase credentials from credentials Map.");
+            } catch (Exception e) {
+                LOG.warn("Could not obtain credentials from credentials map.", e);
+            }
+        }
+        return credential;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        addCredentialToSubject(subject, credentials);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+        try {
+            Object credential = getCredentials(credentials);
+            if (credential != null) {
+                subject.getPrivateCredentials().add(credential);
+                LOG.info("Hbase credentials added to subject.");
+            } else {
+                LOG.info("No credential found in credentials map.");
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected byte[] getHadoopCredentials(Map conf) {
+        try {
+            final Configuration hbaseConf = HBaseConfiguration.create();
+            if(UserGroupInformation.isSecurityEnabled()) {
+                final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+                UserProvider provider = UserProvider.instantiate(hbaseConf);
+
+                hbaseConf.set(HBASE_KEYTAB_FILE_KEY, conf.get(HBASE_KEYTAB_FILE_KEY).toString());
+                hbaseConf.set(HBASE_PRINCIPAL_KEY, conf.get(HBASE_PRINCIPAL_KEY).toString());
+                provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+                LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+                UserGroupInformation.setConfiguration(hbaseConf);
+
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+                User user = User.create(ugi);
+
+                if(user.isHBaseSecurityEnabled(hbaseConf)) {
+                    TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+                    LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+                    Credentials credential= proxyUser.getCredentials();
+                    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+                    ObjectOutputStream out = new ObjectOutputStream(bao);
+                    credential.write(out);
+                    out.flush();
+                    out.close();
+                    return bao.toByteArray();
+                } else {
+                    throw new RuntimeException("Security is not enabled for HBase.");
+                }
+            } else {
+                throw new RuntimeException("Security is not enabled for Hadoop");
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get delegation tokens." , ex);
+        }
+    }
+
+    @Override
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        //HBASE tokens are not renewable so we always have to get new ones.
+        populateCredentials(credentials, topologyConf);
+    }
+
+    protected String getCredentialKey() {
+        return HBASE_CREDENTIALS;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        Map conf = new HashMap();
+        conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+        conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
+        conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+        AutoHBase autoHBase = new AutoHBase();
+        autoHBase.prepare(conf);
+
+        Map<String,String> creds  = new HashMap<String, String>();
+        autoHBase.populateCredentials(creds, conf);
+        LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+        Subject s = new Subject();
+        autoHBase.populateSubject(s, creds);
+        LOG.info("Got a Subject " + s);
+
+        autoHBase.renew(creds, conf);
+        LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index bb53478..d941b66 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -17,35 +17,73 @@
  */
 package org.apache.storm.hbase.security;
 
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class provides util methods for storm-hbase connector communicating
  * with secured HBase.
  */
 public class HBaseSecurityUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
+
     public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
     public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
 
     public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
-        UserProvider provider = UserProvider.instantiate(hbaseConfig);
-        if (UserGroupInformation.isSecurityEnabled()) {
-            String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
-            if (keytab != null) {
-                hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+        AccessControlContext context = AccessController.getContext();
+        Subject subject = Subject.getSubject(context);
+
+        if (subject != null) {
+            Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+            if (privateCredentials != null) {
+                for (Credentials cred : privateCredentials) {
+                    Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+                    if (allTokens != null) {
+                        for (Token<? extends TokenIdentifier> token : allTokens) {
+                            UserGroupInformation.getCurrentUser().addToken(token);
+                            LOG.info("Added Hbase delegation tokens to UGI.");
+                        }
+                    }
+                }
             }
-            String userName = (String) conf.get(STORM_USER_NAME_KEY);
-            if (userName != null) {
-                hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+        }
+
+        //Allowing keytab based login for backward compatibility.
+        UserProvider provider = UserProvider.instantiate(hbaseConfig);
+        if (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+                !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName()))) {
+            LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+            if (UserGroupInformation.isSecurityEnabled()) {
+                String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+                if (keytab != null) {
+                    hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+                }
+                String userName = (String) conf.get(STORM_USER_NAME_KEY);
+                if (userName != null) {
+                    hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+                }
+                provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
+                        InetAddress.getLocalHost().getCanonicalHostName());
             }
-            provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY, 
-                InetAddress.getLocalHost().getCanonicalHostName());
         }
         return provider;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 7b31fad..20ff0d3 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.hbase.trident.state;
 
+import backtype.storm.Config;
 import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -109,7 +111,12 @@ public class HBaseState implements State {
             }
         }
 
-        this.hBaseClient = new HBaseClient(conf, hbConfig, options.tableName);
+        //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+        //the conf instance is instance of persistentMap so making a copy.
+        Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+        hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+
+        this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, options.tableName);
     }
 
     @Override