You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:01 UTC
[06/14] STORM-216: Added Authentication and Authorization.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
new file mode 100644
index 0000000..807abe3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Principal;
+import java.util.Map;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+
+/**
+ * Custom LoginModule to enable Auto Login based on cached ticket
+ */
+public class AutoTGTKrb5LoginModule implements LoginModule {
+ private static final Logger LOG = LoggerFactory.getLogger(AutoTGTKrb5LoginModule.class);
+
+ // initial state
+ private Subject subject;
+
+ protected KerberosTicket kerbTicket = null;
+
+ public void initialize(Subject subject,
+ CallbackHandler callbackHandler,
+ Map<String, ?> sharedState,
+ Map<String, ?> options) {
+
+ this.subject = subject;
+ }
+
+ public boolean login() throws LoginException {
+ LOG.debug("Acquire TGT from Cache");
+ getKerbTicketFromCache();
+ if (kerbTicket != null) {
+ return true;
+ } else {
+ throw new LoginException("Authentication failed, the TGT not found.");
+ }
+ }
+
+ protected void getKerbTicketFromCache() {
+ kerbTicket = AutoTGT.kerbTicket.get();
+ }
+
+ protected Principal getKerbTicketClient() {
+ if (kerbTicket != null) {
+ return kerbTicket.getClient();
+ }
+ return null;
+ }
+
+ public boolean commit() throws LoginException {
+ if (isSucceeded() == false) {
+ return false;
+ }
+ if (subject == null || subject.isReadOnly()) {
+ kerbTicket = null;
+ throw new LoginException("Authentication failed because the Subject is invalid.");
+ }
+ // Let us add the kerbClientPrinc and kerbTicket
+ subject.getPrivateCredentials().add(kerbTicket);
+ subject.getPrincipals().add(getKerbTicketClient());
+ LOG.debug("Commit Succeeded.");
+ return true;
+ }
+
+ public boolean abort() throws LoginException {
+ if (isSucceeded() == false) {
+ return false;
+ } else {
+ return logout();
+ }
+ }
+
+ public boolean logout() throws LoginException {
+ if (subject != null && !subject.isReadOnly() && kerbTicket != null) {
+ subject.getPrincipals().remove(kerbTicket.getClient());
+ subject.getPrivateCredentials().remove(kerbTicket);
+ }
+ kerbTicket = null;
+ return true;
+ }
+
+ private boolean isSucceeded() {
+ return kerbTicket != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
new file mode 100644
index 0000000..ba34fc9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.security.Principal;
+import javax.security.auth.kerberos.KerberosTicket;
+
+/**
+ * Custom LoginModule extended for testing.
+ */
+public class AutoTGTKrb5LoginModuleTest extends AutoTGTKrb5LoginModule {
+
+ public Principal client = null;
+
+ public void setKerbTicket(KerberosTicket ticket) {
+ this.kerbTicket = ticket;
+ }
+
+ @Override
+ protected void getKerbTicketFromCache() {
+ // Do nothing.
+ }
+
+ @Override
+ protected Principal getKerbTicketClient() {
+ return this.client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
new file mode 100644
index 0000000..d46aa8b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * SASL client side callback handler.
+ */
+public class ClientCallbackHandler implements CallbackHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientCallbackHandler.class);
+
+ /**
+ * Constructor based on a JAAS configuration
+ *
+ * For digest, you should have a pair of user name and password defined in this figgure.
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ public ClientCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration == null) return;
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
+ + "' entry in this configuration: Client cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ }
+
+ /**
+ * This method is invoked by SASL for authentication challenges
+ * @param callbacks a collection of challenge callbacks
+ */
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback c : callbacks) {
+ if (c instanceof NameCallback) {
+ LOG.debug("name callback");
+ } else if (c instanceof PasswordCallback) {
+ LOG.debug("password callback");
+ LOG.warn("Could not login: the client is being asked for a password, but the " +
+ " client code does not currently support obtaining a password from the user." +
+ " Make sure that the client is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+ " you still get this message after that, the TGT in the ticket cache has expired and must" +
+ " be manually refreshed. To do so, first determine if you are using a password or a" +
+ " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+ " is running this client using the command" +
+ " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+ " If the latter, do" +
+ " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+ " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+ " restart this client. If you continue to see this message after manually refreshing" +
+ " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+ } else if (c instanceof AuthorizeCallback) {
+ LOG.debug("authorization callback");
+ AuthorizeCallback ac = (AuthorizeCallback) c;
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ ac.setAuthorizedID(authzid);
+ }
+ } else {
+ throw new UnsupportedCallbackException(c);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
new file mode 100644
index 0000000..451f87b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.SaslTransportPlugin;
+
+public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
+ public static final String KERBEROS = "GSSAPI";
+ private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
+
+ public TTransportFactory getServerTransportFactory() throws IOException {
+ //create an authentication callback handler
+ CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf);
+
+ //login our principal
+ Subject subject = null;
+ try {
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
+ subject = login.getSubject();
+ } catch (LoginException ex) {
+ LOG.error("Server failed to login in principal:" + ex, ex);
+ throw new RuntimeException(ex);
+ }
+
+ //check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ throw new RuntimeException("Fail to verify user principal with section \""
+ +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+ }
+
+ String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+ LOG.debug("principal:"+principal);
+ KerberosName serviceKerberosName = new KerberosName(principal);
+ String serviceName = serviceKerberosName.getServiceName();
+ String hostName = serviceKerberosName.getHostName();
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+
+ //create a transport factory that will invoke our auth callback for digest
+ TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
+ factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
+
+ //create a wrap transport factory so that we could apply user credential during connections
+ TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
+
+ LOG.info("SASL GSSAPI transport factory will be used");
+ return wrapFactory;
+ }
+
+ public TTransport connect(TTransport transport, String serverHost) throws TTransportException, IOException {
+ //create an authentication callback handler
+ ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+
+ //login our user
+ Login login = null;
+ try {
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+ } catch (LoginException ex) {
+ LOG.error("Server failed to login in principal:" + ex, ex);
+ throw new RuntimeException(ex);
+ }
+
+ final Subject subject = login.getSubject();
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
+ throw new RuntimeException("Fail to verify user principal with section \""
+ +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+ }
+
+ final String principal = getPrincipal(subject);
+ String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+ if (serviceName == null) {
+ serviceName = AuthUtils.SERVICE;
+ }
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+
+ LOG.debug("SASL GSSAPI client transport is being established");
+ final TTransport sasalTransport = new TSaslClientTransport(KERBEROS,
+ principal,
+ serviceName,
+ serverHost,
+ props,
+ null,
+ transport);
+
+ //open Sasl transport with the login credential
+ try {
+ Subject.doAs(subject,
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ LOG.debug("do as:"+ principal);
+ sasalTransport.open();
+ }
+ catch (Exception e) {
+ LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
+ }
+ return null;
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ throw new RuntimeException(e);
+ }
+
+ return sasalTransport;
+ }
+
+ private String getPrincipal(Subject subject) {
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals==null || principals.size()<1) {
+ LOG.info("No principal found in login subject");
+ return null;
+ }
+ return ((Principal)(principals.toArray()[0])).getName();
+ }
+
+ /** A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final Subject subject;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, Subject subject) {
+ this.wrapped = wrapped;
+ this.subject = subject;
+
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals.size()>0)
+ LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ try {
+ return Subject.doAs(subject,
+ new PrivilegedExceptionAction<TTransport>() {
+ public TTransport run() {
+ try {
+ return wrapped.getTransport(trans);
+ }
+ catch (Exception e) {
+ LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
+ return null;
+ }
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ LOG.error("Storm server experienced a PrivilegedActionException exception while creating a transport using a JAAS principal context:" + e, e);
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
new file mode 100644
index 0000000..9dc75c4
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth.kerberos;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import backtype.storm.security.auth.AuthUtils;
+
+/**
+ * SASL server side callback handler
+ */
+public class ServerCallbackHandler implements CallbackHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
+
+ private String userName;
+
+ public ServerCallbackHandler(Configuration configuration) throws IOException {
+ if (configuration==null) return;
+
+ AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
+ if (configurationEntries == null) {
+ String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ LOG.error(errorMessage);
+ throw new IOException(errorMessage);
+ }
+ }
+
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ handleNameCallback((NameCallback) callback);
+ } else if (callback instanceof PasswordCallback) {
+ handlePasswordCallback((PasswordCallback) callback);
+ } else if (callback instanceof AuthorizeCallback) {
+ handleAuthorizeCallback((AuthorizeCallback) callback);
+ }
+ }
+ }
+
+ private void handleNameCallback(NameCallback nc) {
+ LOG.debug("handleNameCallback");
+ userName = nc.getDefaultName();
+ nc.setName(nc.getDefaultName());
+ }
+
+ private void handlePasswordCallback(PasswordCallback pc) {
+ LOG.warn("No password found for user: " + userName);
+ }
+
+ private void handleAuthorizeCallback(AuthorizeCallback ac) {
+ String authenticationID = ac.getAuthenticationID();
+ LOG.debug("Successfully authenticated client: authenticationID=" + authenticationID);
+ ac.setAuthorized(true);
+
+ ac.setAuthorizedID(authenticationID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
new file mode 100644
index 0000000..92a1399
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_cluster.conf
@@ -0,0 +1,31 @@
+/*
+This is a sample JAAS configuration for Storm servers to handle Kerberos authentication
+*/
+
+/*
+ StormServer section should contains the info about server keytab file and server principal.
+ In Storm, we have 2 thrift servers: Nimbus and DRPC. These servers could be assigned with
+ different principals.
+*/
+StormServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/storm_server.keytab"
+ storeKey=true
+ useTicketCache=false
+ principal="storm_service/carcloth.corp.acme.com@STORM.CORP.ACME.COM";
+};
+
+/*
+StormClient section should contains the info about client keytab file and client principal.
+For example, Supervisors are clients of Nimbus, and we should assign keytab/principal for supervisors.
+*/
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ keyTab="/etc/storm_client.keytab"
+ storeKey=true
+ useTicketCache=false
+ serviceName="storm_service";
+};
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
new file mode 100644
index 0000000..138e1f3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/kerberos/jaas_kerberos_launcher.conf
@@ -0,0 +1,12 @@
+/*
+ This is a sample JAAS configuration for Storm topology launcher/submitter.
+ Since launcher machines are typically accessible by many folks, we
+ encourage you to leverage "kinit", instead of keytab.
+*/
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ doNotPrompt=true
+ useTicketCache=true
+ serviceName="storm_service";
+};
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java b/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
new file mode 100644
index 0000000..4d25ac7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/SingleUserSimpleTransport.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.testing;
+
+import backtype.storm.security.auth.SimpleTransportPlugin;
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.HashSet;
+
+
+public class SingleUserSimpleTransport extends SimpleTransportPlugin {
+ @Override
+ protected Subject getDefaultSubject() {
+ HashSet<Principal> principals = new HashSet<Principal>();
+ principals.add(new Principal() {
+ public String getName() { return "user"; }
+ public String toString() { return "user"; }
+ });
+ return new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java b/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
new file mode 100644
index 0000000..3d4a463
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/transactional/state/TestTransactionalState.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.transactional.state;
+
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Facilitates testing of non-public methods in the parent class.
+ */
+public class TestTransactionalState extends TransactionalState {
+
+ /**
+ * Matching constructor in absence of a default constructor in the parent
+ * class.
+ */
+ protected TestTransactionalState(Map conf, String id, Map componentConf, String subroot) {
+ super(conf, id, componentConf, subroot);
+ }
+
+ public static void createNode(CuratorFramework curator,
+ String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
+ throws Exception {
+ TransactionalState.createNode(curator, rootDir, data, acls, mode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
index 91fc666..5afcd0a 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/state/TransactionalState.java
@@ -21,18 +21,25 @@ import backtype.storm.Config;
import backtype.storm.serialization.KryoValuesDeserializer;
import backtype.storm.serialization.KryoValuesSerializer;
import backtype.storm.utils.Utils;
+import backtype.storm.utils.ZookeeperAuthInfo;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.PathAndBytesable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
public class TransactionalState {
CuratorFramework _curator;
KryoValuesSerializer _ser;
KryoValuesDeserializer _des;
+ List<ACL> _zkAcls = null;
public static TransactionalState newUserState(Map conf, String id, Map componentConf) {
return new TransactionalState(conf, id, componentConf, "user");
@@ -51,26 +58,55 @@ public class TransactionalState {
componentConf
.get(Config.TOPOLOGY_KRYO_REGISTER));
}
- String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
+ String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+ String rootDir = transactionalRoot + "/" + id + "/" + subroot;
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
+ ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
+ CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+ _zkAcls = Utils.getWorkerACL(conf);
try {
- initter.create().creatingParentsIfNeeded().forPath(rootDir);
- } catch(KeeperException.NodeExistsException e) {
-
+ TransactionalState.createNode(initter, transactionalRoot, null, null, null);
+ } catch (KeeperException.NodeExistsException e) {
+ }
+ try {
+ TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
+ } catch (KeeperException.NodeExistsException e) {
}
-
initter.close();
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
+ _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
_ser = new KryoValuesSerializer(conf);
_des = new KryoValuesDeserializer(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ protected static String forPath(PathAndBytesable<String> builder,
+ String path, byte[] data) throws Exception {
+ return (data == null)
+ ? builder.forPath(path)
+ : builder.forPath(path, data);
+ }
+
+ protected static void createNode(CuratorFramework curator, String path,
+ byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+ ProtectACLCreateModePathAndBytesable<String> builder =
+ curator.create().creatingParentsIfNeeded();
+ if (acls == null) {
+ if (mode == null ) {
+ TransactionalState.forPath(builder, path, data);
+ } else {
+ TransactionalState.forPath(builder.withMode(mode), path, data);
+ }
+ return;
+ }
+
+ TransactionalState.forPath(builder.withACL(acls), path, data);
+ }
+
public void setData(String path, Object obj) {
path = "/" + path;
byte[] ser = _ser.serializeObject(obj);
@@ -78,10 +114,8 @@ public class TransactionalState {
if(_curator.checkExists().forPath(path)!=null) {
_curator.setData().forPath(path, ser);
} else {
- _curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, ser);
+ TransactionalState.createNode(_curator, path, ser, _zkAcls,
+ CreateMode.PERSISTENT);
}
} catch(Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java b/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
new file mode 100644
index 0000000..9d0ee92
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ui/InvalidRequestException.java
@@ -0,0 +1,20 @@
+package backtype.storm.ui;
+
+public class InvalidRequestException extends Exception {
+
+ public InvalidRequestException() {
+ super();
+ }
+
+ public InvalidRequestException(String msg) {
+ super(msg);
+ }
+
+ public InvalidRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public InvalidRequestException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
index cf38fb8..3218e49 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
@@ -17,68 +17,49 @@
*/
package backtype.storm.utils;
+import backtype.storm.Config;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.generated.DistributedRPC;
+import backtype.storm.generated.AuthorizationException;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
-public class DRPCClient implements DistributedRPC.Iface {
+import java.util.Map;
+
+public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
private TTransport conn;
private DistributedRPC.Client client;
private String host;
private int port;
private Integer timeout;
- public DRPCClient(String host, int port, Integer timeout) {
- try {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- connect();
- } catch(TException e) {
- throw new RuntimeException(e);
- }
- }
-
- public DRPCClient(String host, int port) {
- this(host, port, null);
+ public DRPCClient(Map conf, String host, int port) throws TTransportException {
+ this(conf, host, port, null);
}
-
- private void connect() throws TException {
- TSocket socket = new TSocket(host, port);
- if(timeout!=null) {
- socket.setTimeout(timeout);
- }
- conn = new TFramedTransport(socket);
- client = new DistributedRPC.Client(new TBinaryProtocol(conn));
- conn.open();
+
+ public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
+ super(conf, ThriftConnectionType.DRPC, host, port, timeout);
+ this.host = host;
+ this.port = port;
+ this.client = new DistributedRPC.Client(_protocol);
}
-
+
public String getHost() {
return host;
}
public int getPort() {
return port;
- }
+ }
- public String execute(String func, String args) throws TException, DRPCExecutionException {
- try {
- if(client==null) connect();
- return client.execute(func, args);
- } catch(TException e) {
- client = null;
- throw e;
- } catch(DRPCExecutionException e) {
- client = null;
- throw e;
- }
+ public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException {
+ return client.execute(func, args);
}
- public void close() {
- conn.close();
+ public DistributedRPC.Client getClient() {
+ return client;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 0d0ae07..f412ff3 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -23,16 +23,19 @@ import java.io.File;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
* Every read/write hits disk.
*/
public class LocalState {
+ public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
private VersionedStore _vs;
public LocalState(String backingDir) throws IOException {
+ LOG.debug("New Local State for {}", backingDir);
_vs = new VersionedStore(backingDir);
}
@@ -83,8 +86,14 @@ public class LocalState {
private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
byte[] toWrite = Utils.serialize(val);
String newPath = _vs.createVersion();
- FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
+ File file = new File(newPath);
+ FileUtils.writeByteArrayToFile(file, toWrite);
+ if (toWrite.length != file.length()) {
+ throw new IOException("Tried to serialize " + toWrite.length +
+ " bytes to " + file.getCanonicalPath() + ", but " +
+ file.length() + " bytes were written.");
+ }
_vs.succeedVersion(newPath);
if(cleanup) _vs.cleanup(4);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index e93acc8..273e232 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -19,6 +19,7 @@ package backtype.storm.utils;
import backtype.storm.Config;
import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
import backtype.storm.generated.Nimbus;
import java.util.Map;
import org.apache.thrift.transport.TTransportException;
@@ -32,8 +33,7 @@ public class NimbusClient extends ThriftClient {
public static NimbusClient getConfiguredClient(Map conf) {
try {
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
- int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
- return new NimbusClient(conf, nimbusHost, nimbusPort);
+ return new NimbusClient(conf, nimbusHost);
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
@@ -44,7 +44,12 @@ public class NimbusClient extends ThriftClient {
}
public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
- super(conf, host, port, timeout);
+ super(conf, ThriftConnectionType.NIMBUS, host, port, timeout);
+ _client = new Nimbus.Client(_protocol);
+ }
+
+ public NimbusClient(Map conf, String host) throws TTransportException {
+ super(conf, ThriftConnectionType.NIMBUS, host, null, null);
_client = new Nimbus.Client(_protocol);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TestUtils.java b/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
new file mode 100644
index 0000000..276559c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TestUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import java.util.Map;
+
+public class TestUtils extends Utils {
+
+ public static void testSetupBuilder(CuratorFrameworkFactory.Builder
+ builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
+ {
+ setupBuilder(builder, zkStr, conf, auth);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index a1fed96..c28d93a 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.utils;
+import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
@@ -45,7 +46,12 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
@@ -53,10 +59,13 @@ import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.AuthorizationException;
+
import clojure.lang.IFn;
import clojure.lang.RT;
public class Utils {
+ public static Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
public static Object newInstance(String klass) {
@@ -254,7 +263,7 @@ public class Utils {
return ret;
}
- public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
+ public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
String id = client.getClient().beginFileDownload(file);
WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
@@ -307,6 +316,8 @@ public class Utils {
return (Integer) o;
} else if (o instanceof Short) {
return ((Short) o).intValue();
+ } else if (o instanceof String) {
+ return Integer.parseInt((String) o);
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
}
@@ -316,11 +327,6 @@ public class Utils {
return UUID.randomUUID().getLeastSignificantBits();
}
-
- public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
- return newCurator(conf, servers, port, root, null);
- }
-
public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry {
protected final int maxRetryInterval;
@@ -350,32 +356,39 @@ public class Utils {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new BoundedExponentialBackoffRetry(
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+
+ setupBuilder(builder, zkStr, conf, auth);
+
+ return builder.build();
+ }
+
+ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
+ {
+ builder.connectString(zkStr)
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(new BoundedExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING))));
- if(auth!=null && auth.scheme!=null) {
+ if(auth!=null && auth.scheme!=null && auth.payload!=null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
- return builder.build();
}
- public static CuratorFramework newCurator(Map conf, List<String> servers, Object port) {
- return newCurator(conf, servers, port, "");
+ public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
+ return newCurator(conf, servers, port, "", auth);
}
- public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root) {
- CuratorFramework ret = newCurator(conf, servers, port, root);
+ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
+ CuratorFramework ret = newCurator(conf, servers, port, root, auth);
ret.start();
return ret;
}
- public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
- CuratorFramework ret = newCurator(conf, servers, port);
+ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
+ CuratorFramework ret = newCurator(conf, servers, port, auth);
ret.start();
return ret;
}
@@ -413,6 +426,18 @@ public class Utils {
return ret;
}
+ public static void readAndLogStream(String prefix, InputStream in) {
+ try {
+ BufferedReader r = new BufferedReader(new InputStreamReader(in));
+ String line = null;
+ while ((line = r.readLine())!= null) {
+ LOG.info("{}:{}", prefix, line);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error whiel trying to log stream", e);
+ }
+ }
+
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
while(t != null) {
@@ -423,4 +448,67 @@ public class Utils {
}
return false;
}
+
+ /**
+ * Is the cluster configured to interact with ZooKeeper in a secure way?
+ * This only works when called from within Nimbus or a Supervisor process.
+ * @param conf the storm configuration, not the topology configuration
+ * @return true if it is configured else false.
+ */
+ public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
+ return null != System.getProperty("java.security.auth.login.config")
+ || (conf != null
+ && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
+ && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+ }
+
+ /**
+ * Is the topology configured to have ZooKeeper authentication.
+ * @param conf the topology configuration
+ * @return true if ZK is configured else false
+ */
+ public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
+ return (conf != null
+ && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
+ && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+ }
+
+ public static List<ACL> getWorkerACL(Map conf) {
+ //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
+ if (!isZkAuthenticationConfiguredTopology(conf)) {
+ return null;
+ }
+ String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
+ if (stormZKUser == null) {
+ throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+ }
+ String[] split = stormZKUser.split(":",2);
+ if (split.length != 2) {
+ throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ }
+ ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
+ ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
+ return ret;
+ }
+
+ public static String threadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
+ final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java b/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
index a5a2e9a..d972135 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ZookeeperAuthInfo.java
@@ -27,8 +27,13 @@ public class ZookeeperAuthInfo {
public byte[] payload = null;
public ZookeeperAuthInfo(Map conf) {
- String scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME);
- String payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
+ String scheme = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME);
+ String payload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+
+ if (scheme == null || payload == null) {
+ scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME);
+ payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
+ }
if(scheme!=null) {
this.scheme = scheme;
if(payload != null) {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java b/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
new file mode 100644
index 0000000..08a763a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ZookeeperServerCnxnFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.utils;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperServerCnxnFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class);
+ int _port;
+ NIOServerCnxnFactory _factory;
+
+ public ZookeeperServerCnxnFactory(int port, int maxClientCnxns) {
+ //port range
+ int max;
+ if (port <= 0) {
+ _port = 2000;
+ max = 65535;
+ } else {
+ _port = port;
+ max = port;
+ }
+
+ try {
+ _factory = new NIOServerCnxnFactory();
+ } catch (IOException e) {
+ _port = 0;
+ _factory = null;
+ e.printStackTrace();
+ throw new RuntimeException(e.getMessage());
+ }
+
+ //look for available port
+ for (; _port <= max; _port++) {
+ try {
+ _factory.configure(new InetSocketAddress(_port), maxClientCnxns);
+ LOG.debug("Zookeeper server successfully binded at port "+_port);
+ break;
+ } catch (BindException e1) {
+ } catch (IOException e2) {
+ _port = 0;
+ _factory = null;
+ e2.printStackTrace();
+ throw new RuntimeException(e2.getMessage());
+ }
+ }
+
+ if (_port > max) {
+ _port = 0;
+ _factory = null;
+ LOG.error("Failed to find a port for Zookeeper");
+ throw new RuntimeException("No port is available to launch an inprocess zookeeper.");
+ }
+ }
+
+ public int port() {
+ return _port;
+ }
+
+ public NIOServerCnxnFactory factory() {
+ return _factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
index 4ebb667..d49371a 100644
--- a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
@@ -20,6 +20,7 @@ package storm.trident.drpc;
import backtype.storm.Config;
import backtype.storm.drpc.DRPCInvocationsClient;
import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.json.simple.JSONValue;
import storm.trident.drpc.ReturnResultsReducer.ReturnResultsState;
import storm.trident.operation.MultiReducer;
@@ -47,12 +49,13 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
}
}
boolean local;
-
+ Map conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
@Override
public void prepare(Map conf, TridentMultiReducerContext context) {
+ this.conf = conf;
local = conf.get(Config.STORM_CLUSTER_MODE).equals("local");
}
@@ -89,7 +92,11 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
}};
if(!_clients.containsKey(server)) {
- _clients.put(server, new DRPCInvocationsClient(host, port));
+ try {
+ _clients.put(server, new DRPCInvocationsClient(conf, host, port));
+ } catch (TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
}
client = _clients.get(server);
}
@@ -98,6 +105,8 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
client.result(id, result);
} catch(TException e) {
collector.reportError(e);
+ } catch (AuthorizationException aze) {
+ collector.reportError(aze);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java b/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
new file mode 100644
index 0000000..ff3edb6
--- /dev/null
+++ b/storm-core/src/jvm/storm/trident/topology/state/TestTransactionalState.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package storm.trident.topology.state;
+
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Facilitates testing of non-public methods in the parent class.
+ */
+public class TestTransactionalState extends TransactionalState {
+
+ /**
+ * Matching constructor in absence of a default constructor in the parent
+ * class.
+ */
+ protected TestTransactionalState(Map conf, String id, String subroot) {
+ super(conf, id, subroot);
+ }
+
+ public static void createNode(CuratorFramework curator,
+ String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
+ throws Exception {
+ TransactionalState.createNode(curator, rootDir, data, acls, mode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
index 5325137..5fba1a2 100644
--- a/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java
@@ -20,7 +20,12 @@ package storm.trident.topology.state;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
+import backtype.storm.utils.ZookeeperAuthInfo;
+
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.PathAndBytesable;
+
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,10 +33,14 @@ import java.util.List;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.json.simple.JSONValue;
public class TransactionalState {
CuratorFramework _curator;
+ List<ACL> _zkAcls = null;
public static TransactionalState newUserState(Map conf, String id) {
return new TransactionalState(conf, id, "user");
@@ -44,24 +53,53 @@ public class TransactionalState {
protected TransactionalState(Map conf, String id, String subroot) {
try {
conf = new HashMap(conf);
- String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
+ String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+ String rootDir = transactionalRoot + "/" + id + "/" + subroot;
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
+ ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
+ CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+ _zkAcls = Utils.getWorkerACL(conf);
+ try {
+ TransactionalState.createNode(initter, transactionalRoot, null, null, null);
+ } catch (KeeperException.NodeExistsException e) {
+ }
try {
- initter.create().creatingParentsIfNeeded().forPath(rootDir);
- } catch(KeeperException.NodeExistsException e) {
-
+ TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
+ } catch (KeeperException.NodeExistsException e) {
}
-
initter.close();
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
+ _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ protected static String forPath(PathAndBytesable<String> builder,
+ String path, byte[] data) throws Exception {
+ return (data == null)
+ ? builder.forPath(path)
+ : builder.forPath(path, data);
+ }
+
+ protected static void createNode(CuratorFramework curator, String path,
+ byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+ ProtectACLCreateModePathAndBytesable<String> builder =
+ curator.create().creatingParentsIfNeeded();
+ if (acls == null) {
+ if (mode == null ) {
+ TransactionalState.forPath(builder, path, data);
+ } else {
+ TransactionalState.forPath(builder.withMode(mode), path, data);
+ }
+ return;
+ }
+
+ TransactionalState.forPath(builder.withACL(acls), path, data);
+ }
+
public void setData(String path, Object obj) {
path = "/" + path;
byte[] ser;
@@ -74,10 +112,8 @@ public class TransactionalState {
if(_curator.checkExists().forPath(path)!=null) {
_curator.setData().forPath(path, ser);
} else {
- _curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, ser);
+ TransactionalState.createNode(_curator, path, ser, _zkAcls,
+ CreateMode.PERSISTENT);
}
} catch(Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/.autom4te.cfg
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/.autom4te.cfg b/storm-core/src/native/worker-launcher/.autom4te.cfg
new file mode 100644
index 0000000..1ec584f
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/.autom4te.cfg
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# autom4te configuration for worker-launcher
+#
+
+begin-language: "Autoheader-preselections"
+args: --no-cache
+end-language: "Autoheader-preselections"
+
+begin-language: "Automake-preselections"
+args: --no-cache
+end-language: "Automake-preselections"
+
+begin-language: "Autoreconf-preselections"
+args: --no-cache
+end-language: "Autoreconf-preselections"
+
+begin-language: "Autoconf-without-aclocal-m4"
+args: --no-cache
+end-language: "Autoconf-without-aclocal-m4"
+
+begin-language: "Autoconf"
+args: --no-cache
+end-language: "Autoconf"
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po b/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
new file mode 100644
index 0000000..9ce06a8
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/.deps/worker-launcher.Po
@@ -0,0 +1 @@
+# dummy
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/Makefile.am
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/Makefile.am b/storm-core/src/native/worker-launcher/Makefile.am
new file mode 100644
index 0000000..c9183c0
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/Makefile.am
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+AM_CFLAGS=-I$(srcdir)/impl -Wall -g -Werror
+
+# Define the programs that need to be built
+bin_PROGRAMS = worker-launcher
+check_PROGRAMS = test-worker-launcher
+
+TESTS = test-worker-launcher
+
+# Define the sources for the common files
+common_SOURCES = impl/configuration.c impl/worker-launcher.c
+
+# Define the sources for the real executable
+worker_launcher_SOURCES = $(common_SOURCES) impl/main.c
+
+# Define the sources for the test executable
+test_worker_launcher_SOURCES = $(common_SOURCES) test/test-worker-launcher.c
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/configure.ac
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/configure.ac b/storm-core/src/native/worker-launcher/configure.ac
new file mode 100644
index 0000000..ab1ef49
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/configure.ac
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+AC_PREREQ(2.59)
+AC_INIT(worker-launcher, 1.0.0, storm-user@googlegroups.com)
+AC_GNU_SOURCE
+
+AM_INIT_AUTOMAKE([subdir-objects foreign no-dist])
+
+AC_CONFIG_SRCDIR([impl/worker-launcher.c])
+AC_CONFIG_FILES([Makefile])
+
+AC_PREFIX_DEFAULT(`pwd`/../install)
+
+# Checks for programs.
+AC_PROG_CC
+AM_PROG_CC_C_O
+
+# Checks for libraries.
+
+# Checks for header files.
+AC_LANG(C)
+AC_CHECK_HEADERS([unistd.h])
+
+# Checks for typedefs, structures, and compiler characteristics.
+AC_HEADER_STDBOOL
+AC_C_CONST
+AC_TYPE_OFF_T
+AC_TYPE_SIZE_T
+AC_FUNC_STRERROR_R
+
+# Checks for library functions.
+AC_CHECK_FUNCS([mkdir uname])
+AC_OUTPUT
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/native/worker-launcher/impl/configuration.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.c b/storm-core/src/native/worker-launcher/impl/configuration.c
new file mode 100644
index 0000000..7b7a3c1
--- /dev/null
+++ b/storm-core/src/native/worker-launcher/impl/configuration.c
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ensure we get the posix version of dirname by including this first
+#include <libgen.h>
+
+#include "configuration.h"
+#include "worker-launcher.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#define MAX_SIZE 10
+
+struct confentry {
+ const char *key;
+ const char *value;
+};
+
+struct configuration {
+ int size;
+ struct confentry **confdetails;
+};
+
+struct configuration config={.size=0, .confdetails=NULL};
+
+//clean up method for freeing configuration
+void free_configurations() {
+ int i = 0;
+ for (i = 0; i < config.size; i++) {
+ if (config.confdetails[i]->key != NULL) {
+ free((void *)config.confdetails[i]->key);
+ config.confdetails[i]->key = NULL;
+ }
+ if (config.confdetails[i]->value != NULL) {
+ free((void *)config.confdetails[i]->value);
+ config.confdetails[i]->value = NULL;
+ }
+ free(config.confdetails[i]);
+ config.confdetails[i] = NULL;
+ }
+ if (config.size > 0) {
+ free(config.confdetails);
+ config.confdetails = NULL;
+ }
+ config.size = 0;
+}
+
+/**
+ * Is the file/directory only writable by root.
+ * Returns 1 if true
+ */
+static int is_only_root_writable(const char *file) {
+ struct stat file_stat;
+ if (stat(file, &file_stat) != 0) {
+ fprintf(ERRORFILE, "Can't stat file %s - %s\n", file, strerror(errno));
+ return 0;
+ }
+ if (file_stat.st_uid != 0) {
+ fprintf(ERRORFILE, "File %s must be owned by root, but is owned by %d\n",
+ file, file_stat.st_uid);
+ return 0;
+ }
+ if ((file_stat.st_mode & (S_IWGRP | S_IWOTH)) != 0) {
+ fprintf(ERRORFILE,
+ "File %s must not be world or group writable, but is %03o\n",
+ file, file_stat.st_mode & (~S_IFMT));
+ return 0;
+ }
+ return 1;
+}
+
+/**
+ * Ensure that the configuration file and all of the containing directories
+ * are only writable by root. Otherwise, an attacker can change the
+ * configuration and potentially cause damage.
+ * returns 0 if permissions are ok
+ */
+int check_configuration_permissions(const char* file_name) {
+ // copy the input so that we can modify it with dirname
+ char* dir = strndup(file_name, PATH_MAX);
+ char* buffer = dir;
+ do {
+ if (!is_only_root_writable(dir)) {
+ free(buffer);
+ buffer = NULL;
+ return -1;
+ }
+ dir = dirname(dir);
+ } while (strcmp(dir, "/") != 0);
+ free(buffer);
+ buffer = NULL;
+ return 0;
+}
+
+//function used to load the configurations present in the secure config
+void read_config(const char* file_name) {
+ FILE *conf_file;
+ char *line;
+ char *equaltok;
+ char *temp_equaltok;
+ size_t linesize = 1000;
+ int size_read = 0;
+
+ if (file_name == NULL) {
+ fprintf(ERRORFILE, "Null configuration filename passed in\n");
+ exit(INVALID_CONFIG_FILE);
+ }
+
+ #ifdef DEBUG
+ fprintf(LOGFILE, "read_config :Conf file name is : %s \n", file_name);
+ #endif
+
+ //allocate space for ten configuration items.
+ config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
+ * MAX_SIZE);
+ if (config.confdetails == NULL) {
+ fprintf(ERRORFILE, "malloc failed while reading configuration file.\n");
+ exit(OUT_OF_MEMORY);
+ }
+ config.size = 0;
+ conf_file = fopen(file_name, "r");
+ if (conf_file == NULL) {
+ fprintf(ERRORFILE, "Invalid conf file provided : %s \n", file_name);
+ exit(INVALID_CONFIG_FILE);
+ }
+ while(!feof(conf_file)) {
+ line = (char *) malloc(linesize);
+ if(line == NULL) {
+ fprintf(ERRORFILE, "malloc failed while reading configuration file.\n");
+ exit(OUT_OF_MEMORY);
+ }
+ size_read = getline(&line,&linesize,conf_file);
+
+ //feof returns true only after we read past EOF.
+ //so a file with no new line, at last can reach this place
+ //if size_read returns negative check for eof condition
+ if (size_read == -1) {
+ free(line);
+ line = NULL;
+ if(!feof(conf_file)){
+ exit(INVALID_CONFIG_FILE);
+ } else {
+ break;
+ }
+ }
+ int eol = strlen(line) - 1;
+ if(line[eol] == '\n') {
+ //trim the ending new line
+ line[eol] = '\0';
+ }
+ //comment line
+ if(line[0] == '#') {
+ free(line);
+ line = NULL;
+ continue;
+ }
+ //tokenize first to get key and list of values.
+ //if no equals is found ignore this line, can be an empty line also
+ equaltok = strtok_r(line, "=", &temp_equaltok);
+ if(equaltok == NULL) {
+ free(line);
+ line = NULL;
+ continue;
+ }
+ config.confdetails[config.size] = (struct confentry *) malloc(
+ sizeof(struct confentry));
+ if(config.confdetails[config.size] == NULL) {
+ fprintf(LOGFILE,
+ "Failed allocating memory for single configuration item\n");
+ goto cleanup;
+ }
+
+ #ifdef DEBUG
+ fprintf(LOGFILE, "read_config : Adding conf key : %s \n", equaltok);
+ #endif
+
+ memset(config.confdetails[config.size], 0, sizeof(struct confentry));
+ const size_t key_tok_len = strlen(equaltok);
+ config.confdetails[config.size]->key = (char *) malloc(
+ sizeof(char) * (key_tok_len+1));
+ if (config.confdetails[config.size]->key == NULL) {
+ fprintf(LOGFILE,
+ "Failed allocating memory for single configuration item\n");
+ goto cleanup;
+ }
+ memset((void*)config.confdetails[config.size]->key, '\0', key_tok_len+1);
+ strncpy((char *)config.confdetails[config.size]->key, equaltok, key_tok_len);
+ equaltok = strtok_r(NULL, "=", &temp_equaltok);
+ if (equaltok == NULL) {
+ fprintf(LOGFILE, "configuration tokenization failed \n");
+ goto cleanup;
+ }
+ //means value is commented so don't store the key
+ if(equaltok[0] == '#') {
+ free(line);
+ line = NULL;
+ free((void *)config.confdetails[config.size]->key);
+ config.confdetails[config.size]->key = NULL;
+ free(config.confdetails[config.size]);
+ config.confdetails[config.size] = NULL;
+ continue;
+ }
+
+ #ifdef DEBUG
+ fprintf(LOGFILE, "read_config : Adding conf value : %s \n", equaltok);
+ #endif
+
+ const size_t val_tok_len = strlen(equaltok);
+ config.confdetails[config.size]->value = (char *) malloc(
+ sizeof(char) * (val_tok_len+1));
+ if (config.confdetails[config.size]->value == NULL) {
+ fprintf(LOGFILE,
+ "Failed allocating memory for single configuration item\n");
+ goto cleanup;
+ }
+ memset((void *)config.confdetails[config.size]->value, '\0', val_tok_len+1);
+ strncpy((char *)config.confdetails[config.size]->value, equaltok, val_tok_len);
+ if((config.size + 1) % MAX_SIZE == 0) {
+ config.confdetails = (struct confentry **) realloc(config.confdetails,
+ sizeof(struct confentry **) * (MAX_SIZE + config.size));
+ if (config.confdetails == NULL) {
+ fprintf(LOGFILE,
+ "Failed re-allocating memory for configuration items\n");
+ goto cleanup;
+ }
+ }
+ if(config.confdetails[config.size] )
+ config.size++;
+ free(line);
+ line = NULL;
+ }
+
+ //close the file
+ fclose(conf_file);
+
+ if (config.size == 0) {
+ fprintf(ERRORFILE, "Invalid configuration provided in %s\n", file_name);
+ exit(INVALID_CONFIG_FILE);
+ }
+
+ //clean up allocated file name
+ return;
+ //free spaces alloced.
+ cleanup:
+ if (line != NULL) {
+ free(line);
+ line = NULL;
+ }
+ fclose(conf_file);
+ free_configurations();
+ return;
+}
+
+/*
+ * function used to get a configuration value.
+ * The function for the first time populates the configuration details into
+ * array, next time onwards used the populated array.
+ *
+ */
+char * get_value(const char* key) {
+ int count;
+ for (count = 0; count < config.size; count++) {
+ if (strcmp(config.confdetails[count]->key, key) == 0) {
+ return strdup(config.confdetails[count]->value);
+ }
+ }
+ return NULL;
+}
+
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+char ** get_values(const char * key) {
+ char *value = get_value(key);
+ return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char *value) {
+ char ** toPass = NULL;
+ char *tempTok = NULL;
+ char *tempstr = NULL;
+ int size = 0;
+ int toPassSize = MAX_SIZE;
+
+ //first allocate any array of 10
+ if(value != NULL) {
+ toPass = (char **) malloc(sizeof(char *) * toPassSize);
+ tempTok = strtok_r((char *)value, ",", &tempstr);
+ while (tempTok != NULL) {
+ toPass[size++] = tempTok;
+ if(size == toPassSize) {
+ toPassSize += MAX_SIZE;
+ toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
+ }
+ tempTok = strtok_r(NULL, ",", &tempstr);
+ }
+ }
+ if (size > 0) {
+ toPass[size] = NULL;
+ }
+ return toPass;
+}
+
+// free an entry set of values
+void free_values(char** values) {
+ if (*values != NULL) {
+ free(*values);
+ *values = NULL;
+ }
+ if (values != NULL) {
+ free(values);
+ values = NULL;
+ }
+}