You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/06 23:44:13 UTC
[03/11] storm git commit: Moving AutoHDFS andAutoHBase to storm-hdfs
and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with
delegation tokens.
Moving AutoHDFS andAutoHBase to storm-hdfs and storm-hbase.Making storm-hbase bnd storm-hdfsolts/states work with delegation tokens.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a37def89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a37def89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a37def89
Branch: refs/heads/master
Commit: a37def89645f62a5df88ac6b46de3743703bb051
Parents: fefaf9b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 10 17:51:48 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:16:44 2014 -0800
----------------------------------------------------------------------
external/storm-hbase/pom.xml | 17 +-
.../storm/hbase/bolt/AbstractHBaseBolt.java | 9 +-
.../apache/storm/hbase/security/AutoHBase.java | 214 +++++++++++++++++++
.../storm/hbase/security/HBaseSecurityUtil.java | 58 ++++-
.../storm/hbase/trident/state/HBaseState.java | 9 +-
5 files changed, 294 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index fb9aaf1..b2f8f44 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -36,7 +36,7 @@
</developers>
<properties>
- <hbase.version>0.98.1-hadoop2</hbase.version>
+ <hbase.version>0.98.4-hadoop2</hbase.version>
<hdfs.version>2.2.0</hdfs.version>
</properties>
@@ -49,6 +49,21 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 5f6621b..d814117 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.hbase.bolt;
+import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
@@ -28,6 +29,7 @@ import org.apache.storm.hbase.common.HBaseClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
// TODO support more configuration options, for now we're defaulting to the hbase-*.xml files found on the classpath
@@ -57,6 +59,7 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
if(conf == null) {
throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
}
+
if(conf.get("hbase.rootdir") == null) {
LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
}
@@ -64,6 +67,10 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt {
hbConfig.set(key, String.valueOf(conf.get(key)));
}
- this.hBaseClient = new HBaseClient(conf, hbConfig, tableName);
+ //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+ //the conf instance is instance of persistentMap so making a copy.
+ Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+ hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+ this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
new file mode 100644
index 0000000..85f7683
--- /dev/null
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/AutoHBase.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hbase.security;
+
+import backtype.storm.Config;
+import backtype.storm.security.INimbusCredentialPlugin;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.ICredentialsRenewer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Automatically get hbase delegation tokens and push it to user's topology. The class
+ * assumes that hadoop/hbase configuration files are in your class path.
+ */
+public class AutoHBase implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(AutoHBase.class);
+
+ public static final String HBASE_CREDENTIALS = "HBASE_CREDENTIALS";
+ public static final String HBASE_KEYTAB_FILE_KEY = "hbase.keytab.file";
+ public static final String HBASE_PRINCIPAL_KEY = "hbase.kerberos.principal";
+
+ @Override
+ public void prepare(Map conf) {
+ //no op.
+ }
+
+ @Override
+ public void shutdown() {
+ //no op.
+ }
+
+ @Override
+ public void populateCredentials(Map<String, String> credentials, Map conf) {
+ try {
+ credentials.put(getCredentialKey(), DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+ } catch (Exception e) {
+ LOG.warn("Could not populate HBase credentials.", e);
+ }
+ }
+
+ @Override
+ public void populateCredentials(Map<String, String> credentials) {
+ //no op.
+ }
+
+ /*
+ *
+ * @param credentials map with creds.
+ * @return instance of org.apache.hadoop.security.Credentials.
+ * this class's populateCredentials must have been called before.
+ */
+ @SuppressWarnings("unchecked")
+ protected Object getCredentials(Map<String, String> credentials) {
+ Credentials credential = null;
+ if (credentials != null && credentials.containsKey(getCredentialKey())) {
+ try {
+ byte[] credBytes = DatatypeConverter.parseBase64Binary(credentials.get(getCredentialKey()));
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(credBytes));
+
+ credential = new Credentials();
+ credential.readFields(in);
+ LOG.info("Got hbase credentials from credentials Map.");
+ } catch (Exception e) {
+ LOG.warn("Could not obtain credentials from credentials map.", e);
+ }
+ }
+ return credential;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateSubject(Subject subject, Map<String, String> credentials) {
+ addCredentialToSubject(subject, credentials);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void populateSubject(Subject subject, Map<String, String> credentials) {
+ addCredentialToSubject(subject, credentials);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addCredentialToSubject(Subject subject, Map<String, String> credentials) {
+ try {
+ Object credential = getCredentials(credentials);
+ if (credential != null) {
+ subject.getPrivateCredentials().add(credential);
+ LOG.info("Hbase credentials added to subject.");
+ } else {
+ LOG.info("No credential found in credentials map.");
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize and get UserGroupInformation.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected byte[] getHadoopCredentials(Map conf) {
+ try {
+ final Configuration hbaseConf = HBaseConfiguration.create();
+ if(UserGroupInformation.isSecurityEnabled()) {
+ final String topologySubmitterUser = (String) conf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL);
+
+ UserProvider provider = UserProvider.instantiate(hbaseConf);
+
+ hbaseConf.set(HBASE_KEYTAB_FILE_KEY, conf.get(HBASE_KEYTAB_FILE_KEY).toString());
+ hbaseConf.set(HBASE_PRINCIPAL_KEY, conf.get(HBASE_PRINCIPAL_KEY).toString());
+ provider.login(HBASE_KEYTAB_FILE_KEY, HBASE_PRINCIPAL_KEY, InetAddress.getLocalHost().getCanonicalHostName());
+
+ LOG.info("Logged into Hbase as principal = " + conf.get(HBASE_PRINCIPAL_KEY));
+ UserGroupInformation.setConfiguration(hbaseConf);
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ final UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, ugi);
+
+ User user = User.create(ugi);
+
+ if(user.isHBaseSecurityEnabled(hbaseConf)) {
+ TokenUtil.obtainAndCacheToken(hbaseConf, proxyUser);
+
+ LOG.info("Obtained HBase tokens, adding to user credentials.");
+
+ Credentials credential= proxyUser.getCredentials();
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bao);
+ credential.write(out);
+ out.flush();
+ out.close();
+ return bao.toByteArray();
+ } else {
+ throw new RuntimeException("Security is not enabled for HBase.");
+ }
+ } else {
+ throw new RuntimeException("Security is not enabled for Hadoop");
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to get delegation tokens." , ex);
+ }
+ }
+
+ @Override
+ public void renew(Map<String, String> credentials, Map topologyConf) {
+ //HBASE tokens are not renewable so we always have to get new ones.
+ populateCredentials(credentials, topologyConf);
+ }
+
+ protected String getCredentialKey() {
+ return HBASE_CREDENTIALS;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) throws Exception {
+ Map conf = new HashMap();
+ conf.put(Config.TOPOLOGY_SUBMITTER_PRINCIPAL, args[0]); //with realm e.g. storm@WITZEND.COM
+ conf.put(HBASE_PRINCIPAL_KEY,args[1]); // hbase principal storm-hbase@WITZEN.COM
+ conf.put(HBASE_KEYTAB_FILE_KEY,args[2]); // storm hbase keytab /etc/security/keytabs/storm-hbase.keytab
+
+ AutoHBase autoHBase = new AutoHBase();
+ autoHBase.prepare(conf);
+
+ Map<String,String> creds = new HashMap<String, String>();
+ autoHBase.populateCredentials(creds, conf);
+ LOG.info("Got HBase credentials" + autoHBase.getCredentials(creds));
+
+ Subject s = new Subject();
+ autoHBase.populateSubject(s, creds);
+ LOG.info("Got a Subject " + s);
+
+ autoHBase.renew(creds, conf);
+ LOG.info("renewed credentials" + autoHBase.getCredentials(creds));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
index bb53478..d941b66 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
@@ -17,35 +17,73 @@
*/
package org.apache.storm.hbase.security;
+import static backtype.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
import java.io.IOException;
import java.net.InetAddress;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* This class provides util methods for storm-hbase connector communicating
* with secured HBase.
*/
public class HBaseSecurityUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);
+
public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
- UserProvider provider = UserProvider.instantiate(hbaseConfig);
- if (UserGroupInformation.isSecurityEnabled()) {
- String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
- if (keytab != null) {
- hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+ AccessControlContext context = AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+
+ if (subject != null) {
+ Set<Credentials> privateCredentials = subject.getPrivateCredentials(Credentials.class);
+ if (privateCredentials != null) {
+ for (Credentials cred : privateCredentials) {
+ Collection<Token<? extends TokenIdentifier>> allTokens = cred.getAllTokens();
+ if (allTokens != null) {
+ for (Token<? extends TokenIdentifier> token : allTokens) {
+ UserGroupInformation.getCurrentUser().addToken(token);
+ LOG.info("Added Hbase delegation tokens to UGI.");
+ }
+ }
+ }
}
- String userName = (String) conf.get(STORM_USER_NAME_KEY);
- if (userName != null) {
- hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+ }
+
+ //Allowing keytab based login for backward compatibility.
+ UserProvider provider = UserProvider.instantiate(hbaseConfig);
+ if (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||
+ !(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName()))) {
+ LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
+ if (keytab != null) {
+ hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
+ }
+ String userName = (String) conf.get(STORM_USER_NAME_KEY);
+ if (userName != null) {
+ hbaseConfig.set(STORM_USER_NAME_KEY, userName);
+ }
+ provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
+ InetAddress.getLocalHost().getCanonicalHostName());
}
- provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
- InetAddress.getLocalHost().getCanonicalHostName());
}
return provider;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a37def89/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 7b31fad..20ff0d3 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.hbase.trident.state;
+import backtype.storm.Config;
import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -109,7 +111,12 @@ public class HBaseState implements State {
}
}
- this.hBaseClient = new HBaseClient(conf, hbConfig, options.tableName);
+ //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf
+ //the conf instance is instance of persistentMap so making a copy.
+ Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);
+ hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));
+
+ this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, options.tableName);
}
@Override