You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by knusbaum <gi...@git.apache.org> on 2015/10/30 23:28:12 UTC

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

GitHub user knusbaum opened a pull request:

    https://github.com/apache/storm/pull/838

    [STORM-855] Heartbeat Server (Pacemaker)

    This pull request redirects worker heartbeats away from Zookeeper and into a new server, 'Pacemaker'.
    
    The redirection is accomplished by making `ClusterState` pluggable through the `ClusterStateFactory` interface.
    
    By default, Pacemaker is not enabled. It can be enabled by setting `storm.cluster.state.store` from its default value of `"backtype.storm.cluster_state.zookeeper_state_factory"` to `"org.apache.storm.pacemaker.pacemaker_state_factory"`
    
    Pacemaker includes both digest-based and kerberos-based security, but it is primitive.
    
    Right now Pacemaker is not HA, but currently if Pacemaker fails, Nimbus will NOT start killing and reassigning workers, so Pacemaker going down won't bring down a cluster. It does need to be brought back up before new jobs can be submitted, though.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/knusbaum/incubator-storm STORM-855

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #838
    
----
commit 444ec05e5a9f38f9a9472c54b39f1371c839683b
Author: Kyle Nusbaum <ky...@gmail.com>
Date:   2015-10-30T22:21:27Z

    PACEMAKER OPEN SOURCE!

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45116436
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -778,6 +792,47 @@
         public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
     
         /**
    +     * The host that Pacemaker is running on.
    +     */
    +    @isString
    +    public static final String PACEMAKER_HOST = "pacemaker.host";
    +
    +    /**
    +     * The port Pacemaker should run on. Clients should
    +     * connect to this port to submit or read heartbeats.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_PORT = "pacemaker.port";
    +
    +    /**
    +     * The maximum number of threads that should be used by the Pacemaker.
    +     * When Pacemaker gets loaded it will spawn new threads, up to 
    +     * this many total, to handle the load.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
    +
    +    /**
    +     * This parameter is used by the storm-deploy project to configure the
    +     * jvm options for the nimbus daemon.
    --- End diff --
    
    *nimbus* daemon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157542893
  
    @ptgoetz I added some documentation at docs/documentation/Pacemaker.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r44965169
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 5)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-message "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    --- End diff --
    
    May change this to debug to be less verbose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45510787
  
    --- Diff: storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java ---
    @@ -0,0 +1,28 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.cluster;
    +
    +import clojure.lang.APersistentMap;
    +import java.util.List;
    +import org.apache.zookeeper.data.ACL;
    +
    +public interface ClusterStateFactory {
    +    
    +    public ClusterState mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
    --- End diff --
    
    minor: `public` not needed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515492
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -68,6 +74,37 @@ public static Configuration GetConfiguration(Map storm_conf) {
         }
     
         /**
    +     * Pull a set of keys out of a Configuration.
    +     * @param configs_to_pull A set of config keys that you want the values of.
    --- End diff --
    
    There isn't a `configs_to_pull` param.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45128439
  
    --- Diff: docs/documentation/Pacemaker.md ---
    @@ -0,0 +1,89 @@
    +# Pacemaker
    +
    +### Introduction
    +Pacemaker is a storm daemon designed to process heartbeats from workers. As Storm is scaled up, ZooKeeper begins to become a bottleneck due to high volumes of writes from workers doing heartbeats. Lots of writes to disk and traffic across the network is generated as ZooKeeper tries to maintain consistency.
    --- End diff --
    
    `Lots of writes to disk and lots of traffic`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559925
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -281,6 +296,10 @@ private Channel getConnectedChannel() {
                 }
                 return null;
             }
    +        }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559884
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -202,7 +217,7 @@ public Status status() {
         @Override
         public Iterator<TaskMessage> recv(int flags, int clientId) {
             throw new UnsupportedOperationException("Client connection should not receive any messages");
    -    }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45510242
  
    --- Diff: storm-core/src/jvm/backtype/storm/cluster/ClusterState.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.cluster;
    +
    +import clojure.lang.APersistentMap;
    +import clojure.lang.IFn;
    +import java.util.List;
    +import org.apache.zookeeper.data.ACL;
    +
    +/**
    + * ClusterState provides the API for the pluggable state store used by the
    + * Storm daemons. Data is stored in path/value format, and the store supports
    + * listing sub-paths at a given path.
    + * All data should be available across all nodes with eventual consistency.
    + *
    + * IMPORTANT NOTE:
    + * Heartbeats have different api calls used to interact with them. The root
    + * path (/) may or may not be the same as the root path for the other api calls.
    + *
    + * For example, performing these two calls:
    + *     set_data("/path", data, acls);
    + *     void set_worker_hb("/path", heartbeat, acls);
    + * may or may not cause a collision in "/path".
    + * Never use the same paths with the *_hb* methods as you do with the others.
    + */
    +public interface ClusterState {
    +
    +    /**
    +     * Registers a callback function that gets called when CuratorEvents happen.
    +     * @param callback is a clojure IFn that accepts the type - translated to
    +     * clojure keyword as in zookeeper.clj - and the path: (callback type path)
    +     * @return is an id that can be passed to unregister(...) to unregister the
    +     * callback.
    +     */
    +    String register(IFn callback);
    +
    +    /**
    +     * Unregisters a callback function that was registered with register(...).
    +     * @param id is the String id that was returned from register(...).
    +     */
    +    void unregister(String id);
    +
    +    /**
    +     * Path will be appended with a monotonically increasing integer, a new node
    +     * will be created there, and data will be put at that node.
    +     * @param path The path that the monotonically increasing integer suffix will
    +     * be added to.
    +     * @param data The data that will be written at the suffixed path's node.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return The path with the integer suffix appended.
    +     */
    +    String create_sequential(String path, byte[] data, List<ACL> acls);
    +
    +    /**
    +     * Creates nodes for path and all its parents. Path elements are separated by
    +     * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
    +     * @param path The path to create, along with all its parents.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return path
    +     */
    +    String mkdirs(String path, List<ACL> acls);
    +
    +    /**
    +     * Deletes the node at a given path, and any child nodes that may exist.
    +     * @param path The path to delete
    +     */
    +    void delete_node(String path);
    +
    +    /**
    +     * Creates an ephemeral node at path. Ephemeral nodes are destroyed
    +     * by the store when the client disconnects.
    +     * @param path The path where a node will be created.
    +     * @param data The data to be written at the node.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return path
    +     */
    +    void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
    --- End diff --
    
    Doc says this is supposed to return the path?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515535
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -68,6 +74,37 @@ public static Configuration GetConfiguration(Map storm_conf) {
         }
     
         /**
    +     * Pull a set of keys out of a Configuration.
    +     * @param configs_to_pull A set of config keys that you want the values of.
    +     * @param conf The config to pull the key/value pairs out of.
    +     * @param conf_entry The app configuration entry name to get stuff from.
    +     * @return Return a map of the configs in configs_to_pull to their values.
    +     */
    +    public static SortedMap<String, ?> PullConfig(Configuration conf,
    +                                            String conf_entry) throws IOException {
    +        if(conf == null) {
    +            return null;
    +        }
    +        AppConfigurationEntry configurationEntries[] = conf.getAppConfigurationEntry(conf_entry);
    +        if(configurationEntries == null) {
    +            String errorMessage = "Could not find a '" + conf_entry
    +                + "' entry in this configuration: Client cannot start.";
    +            throw new IOException(errorMessage);
    +        }
    +
    +        TreeMap<String, Object> results = new TreeMap<>();
    +        
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r44160889
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java ---
    @@ -0,0 +1,212 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslClient;
    +import javax.security.sasl.SaslException;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implements SASL logic for storm worker client processes.
    + */
    +public class KerberosSaslNettyClient {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslNettyClient.class);
    +
    +    /**
    +     * Used to respond to server's counterpart, SaslServer with SASL tokens
    +     * represented as byte arrays.
    +     */
    +    private SaslClient saslClient;
    +    private Subject subject;
    +    private String jaas_section;
    +    
    +    /**
    +     * Create a KerberosSaslNettyClient for authentication with servers.
    +     */
    +    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
    +        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
    +                  SaslUtils.KERBEROS);
    +        
    +        LOG.info("Creating Kerberos Client.");
    +        
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
    +        
    +        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
    +        
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf); 
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Client failed to login in principal:" + ex, ex);
    +            throw new RuntimeException(ex);
    +        }
    +        
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
    +            LOG.error("Failed to verify user principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \"" +
    +                                       jaas_section +
    +                                       "\" in login configuration file " +
    +                                       login_conf);
    +        }
    +
    +        String serviceName = null;
    +        try {
    +            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
    +        }
    +        catch (IOException e) {
    +            LOG.error("Failed to get service name.", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        try {
    +            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
    +            final String fPrincipalName = principal.getName();
    +            KerberosName kerbName = new KerberosName(principal.getName());
    +            final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
    +            final String fServiceName = serviceName;
    +            final CallbackHandler fch = ch;
    +            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
    +            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
    +                    public SaslClient run() {
    +                        try {
    +                            Map<String, String> props = new TreeMap<String,String>();
    +                            props.put(Sasl.QOP, "auth");
    +                            props.put(Sasl.SERVER_AUTH, "false");
    +                            return Sasl.createSaslClient(
    +                                new String[] { SaslUtils.KERBEROS },
    +                                fPrincipalName,
    +                                fServiceName,
    +                                fHost,
    +                                props, fch);
    +                        }
    +                        catch (Exception e) {
    +                            LOG.error("Subject failed to create sasl client.", e);
    +                            return null;
    +                        }
    +                    }
    +                });
    +            LOG.info("Got Client: {}", saslClient);
    +            
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
    +            throw new RuntimeException(e);
    +        }
    +}
    +
    +    public boolean isComplete() {
    +        return saslClient.isComplete();
    +    }
    +
    +    /**
    +     * Respond to server's SASL token.
    +     * 
    +     * @param saslTokenMessage
    +     *            contains server's SASL token
    +     * @return client's response SASL token
    +     */
    +    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
    +        try {
    +            final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
    +            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
    +                    public byte[] run() {
    +                        try {
    +                            byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
    +                                                                         .getSaslToken());
    +                            return retval;
    +                        } catch (SaslException e) {
    +                            LOG.error("saslResponse: Failed to respond to SASL server's token:",
    +                                      e);
    +                            throw new RuntimeException(e);
    +                        }
    +                    }
    +                });
    +            return retval;
    +        }
    +        catch (PrivilegedActionException e) {
    +            LOG.error("Failed to generate response for token: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Implementation of javax.security.auth.callback.CallbackHandler that works
    +     * with Storm topology tokens.
    +     */
    +    private static class SaslClientCallbackHandler implements CallbackHandler {
    +
    +        /**
    +         * Set private members using topology token.
    +         * 
    +         * @param topologyToken
    +         */
    +        public SaslClientCallbackHandler() {
    +        }
    --- End diff --
    
    Yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157057138
  
    @bastiliu There is a possibility of that. If you have a congested network or if there are other issues around the pacemaker node that it could become a bottleneck.  We don't see this as a silver bullet that will fix all of the problems, just a step in the right direction.  This is similar to what JStorm has done with the TopologyMaster to offload ZooKeeper, although quite different in it's design.
    
    If you watch the talk I gave at Hadoop Summing about scaling storm and the calculations we did 
    https://www.youtube.com/watch?v=pB9d3tMM__k
    http://www.slideshare.net/Hadoop_Summit/from-gust-to-tempest-scaling-storm
    
    Theoretically a single pacemaker node with the current metrics level and gigabit Ethernet should be able to support about 6000 nodes.  The reality is probably closer to between 2000 and 3000 nodes.  With 10GigE we are looking at a lot more and nimbus is likely to be the bottleneck at that point.  But the real reason for this is a step in the direction of moving metrics out of zookeeper and into a pluggable time series database backend.  I have not looked deeply into the metrics changes that JStorm has yet, so all of this code might end up being removed as part of the merger.  We mostly want to make sure that all of the code we write for storm ends up in open source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45511484
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java ---
    @@ -57,14 +57,19 @@ int encodeLength() {
          * encode the current Control Message into a channel buffer
          * @throws Exception
          */
    -    ChannelBuffer buffer() throws IOException {
    +    public ChannelBuffer buffer() throws IOException {
    --- End diff --
    
    May as well fix the javadoc `@throws` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512326
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import backtype.storm.security.auth.KerberosPrincipalToLocal;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosPrincipal;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslException;
    +import javax.security.sasl.SaslServer;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +class KerberosSaslNettyServer {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslNettyServer.class);
    +
    +    private SaslServer saslServer;
    +    private Subject subject;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
    +        this.authorizedUsers = authorizedUsers;
    +        LOG.debug("Getting Configuration.");
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +
    +        LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
    +
    +        KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(storm_conf, authorizedUsers);
    +
    +        //login our principal
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf);
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Server failed to login in principal:", ex);
    +            throw new RuntimeException(ex);
    +        }
    +
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
    +            LOG.error("Failed to verifyuser principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \""
    +                                       + jaas_section
    +                                       + "\" in login configuration file "
    +                                       + login_conf);
    +        }
    +
    +        try {
    +            LOG.info("Creating Kerberos Server.");
    +            final CallbackHandler fch = ch;
    +            Principal p = (Principal)subject.getPrincipals().toArray()[0];
    +            KerberosName kName = new KerberosName(p.getName());
    +            final String fHost = kName.getHostName();
    +            final String fServiceName = kName.getServiceName();
    +            LOG.debug("Server with host: {}", fHost);
    +            saslServer =
    +                Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
    +                        public SaslServer run() {
    +                            try {
    +                                Map<String, String> props = new TreeMap<String,String>();
    +                                props.put(Sasl.QOP, "auth");
    +                                props.put(Sasl.SERVER_AUTH, "false");
    +                                return Sasl.createSaslServer(SaslUtils.KERBEROS,
    +                                                             fServiceName,
    +                                                             fHost, props, fch);
    +                            }
    +                            catch (Exception e) {
    +                                LOG.error("Subject failed to create sasl server.", e);
    +                                return null;
    +                            }
    +                        }
    +                    });
    +            LOG.info("Got Server: {}", saslServer);
    +
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    public boolean isComplete() {
    +        return saslServer.isComplete();
    +    }
    +
    +    public String getUserName() {
    +        return saslServer.getAuthorizationID();
    +    }
    +
    +    private String getPrincipal(Subject subject) {
    +        Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
    +        if (principals==null || principals.size()<1) {
    +            LOG.info("No principal found in login subject");
    +            return null;
    +        }
    +        return ((Principal)(principals.toArray()[0])).getName();
    +    }
    +
    +    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
    +    public static class KerberosSaslCallbackHandler implements CallbackHandler {
    +
    +        /** Used to authenticate the clients */
    +        private Map config;
    --- End diff --
    
    unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45128342
  
    --- Diff: docs/documentation/Pacemaker.md ---
    @@ -0,0 +1,89 @@
    +# Pacemaker
    +
    +### Introduction
    +Pacemaker is a storm daemon designed to process heartbeats from workers. As Storm is scaled up, ZooKeeper begins to become a bottleneck due to high volumes of writes from workers doing heartbeats. Lots of writes to disk and traffic across the network is generated as ZooKeeper tries to maintain consistency.
    +
    +Because heartbeats are of an ephemeral nature, they do not need to be persisted to disk or synced across nodes; an in-memory store will do. This is the role of Pacemaker. Pacemaker functions as a simple in-memory key/value store with ZooKeeper-like, directory-style keys and byte array values.
    +
    +The corresponding Pacemaker client is a plugin for the `ClusterState` interface, `org.apache.storm.pacemaker.pacemaker_state_factory`. Heartbeat calls are funneled by the `ClusterState` produced by `pacemaker_state_factory` into the Pacemaker daemon, while other set/get operations are forwarded to ZooKeeper.
    +
    +------
    +
    +### Configuration
    +
    + - `pacemaker.host` : The host that the Pacemaker daemon is running on
    + - `pacemaker.port` : The port that Pacemaker will listen on
    + - `pacemaker.max.threads` : Maximum number of threads Pacemaker daemon will use to handle requests.
    + - `pacemaker.childopts` : Any JVM parameters that need to go to the Pacemaker. (used by storm-deploy project)
    + - `pacemaker.auth.method` : The authentication method that is used (more info below)
    +
    +#### Example
    +
    +To get Pacemaker up and running, set the following option in the cluster config on all nodes:
    +```
    +storm.cluster.state.store: "org.apache.storm.pacemaker.pacemaker_state_factory"
    +```
    +
    +The Pacemaker host also needs to be set on all nodes:
    +```
    +pacemaker.host: somehost.mycompany.com
    +```
    +
    +And then start all of your daemons
    +
    +(including Pacemaker):
    +```
    +$ storm pacemaker
    +```
    +
    +The Storm cluster should now be pushing all worker heartbeats through Pacemaker.
    +
    +### Security
    +
    +Currently digest (password-based) and Kerberos security are supported. Security is currently only around reads, not writes. Writes may be performed by anyone, whereas reads may only be performed by authorized and authenticated users. This is an area for future development, as it leaves the cluster open to DoS attacks, but it prevents any sensitive information from reaching unauthorized eyes, which was the main goal.
    +
    +#### Digest
    +To configure digest authentication, set `pacemaker.auth.method: DIGEST` in the cluster config on the nodes hosting Nimbus and Pacemaker.
    +The nodes must also have `java.security.auth.login.config` set to point to a jaas config file containing the following structure:
    --- End diff --
    
    `JAAS` (I think this is all caps?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45514952
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java ---
    @@ -136,17 +135,19 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
     
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    -        server.closeChannel(e.getChannel());
    +        if(server != null) server.closeChannel(e.getChannel());
         }
     
         private void getSASLCredentials() throws IOException {
    -        topologyName = (String) this.server.storm_conf
    -                .get(Config.TOPOLOGY_NAME);
    -        String secretKey = SaslUtils.getSecretKey(this.server.storm_conf);
    +        String secretKey;
    +        topologyName = server.name();
    +        secretKey = server.secretKey();
    +
             if (secretKey != null) {
                 token = secretKey.getBytes();
             }
    +
             LOG.debug("SASL credentials for storm topology " + topologyName
    -                + " is " + secretKey);
    +                  + " is " + secretKey);
    --- End diff --
    
    Let's use format strings in this log message so we don't construct strings every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517541
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker.codec;
    +
    +import org.jboss.netty.handler.codec.frame.FrameDecoder;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channel;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.generated.HBMessageData;
    --- End diff --
    
    unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-158193770
  
    @knusbaum Nice work on the documentation. Thank you.
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #838: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/838
  
    @knusbaum can you please take a look at what we have internally and be sure that the fail over/load balancing features all got merged in to open source?  If it is all in then we probably need some better documentation on how to set it up and use it properly.
    
    @danny0405 not having HA is not acceptable. I agree.  The code is written so that if you have more than one pacemaker server the workers will load balance the heartbeats between them, but if one of them goes down the worker will fail over to one of the existing servers.  Nimbus then will read from all of the pacemaker servers and if there are more than one heartbeat for a given worker the one with the newest timestamp wins.  You should be able to get HA just by having more than one of them, but I want to be sure that everything for that including bug fixes have been merged into open source.  They should be, but if you are having issues I want to be sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559947
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -375,7 +394,7 @@ private void waitForPendingMessagesToBeSent() {
             long totalPendingMsgs = pendingMessages.get();
             long startMs = System.currentTimeMillis();
             while (pendingMessages.get() != 0) {
    -            try {
    +        try {
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43941560
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -0,0 +1,154 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.ChannelStateEvent;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslClientHandler.class);
    +    private ISaslClient client;
    +    long start_time;
    +    /** Used for client or server's token to send or receive from each other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +
    +    public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section) throws IOException {
    +        this.client = client;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        start_time = System.currentTimeMillis();
    +    }
    +
    +    @Override
    +    public void channelConnected(ChannelHandlerContext ctx,
    +            ChannelStateEvent event) {
    +        // register the newly established channel
    +        Channel channel = ctx.getChannel();
    +        client.channelConnected(channel);
    +
    +        LOG.info("Connection established from {} to {}",
    +                 channel.getLocalAddress(), channel.getRemoteAddress());
    +
    +        try {
    +            KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
    +                    .get(channel);
    +
    +            if (saslNettyClient == null) {
    +                LOG.debug("Creating saslNettyClient now for channel: {}",
    +                          channel);
    +                saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section);
    +                KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
    +                        saslNettyClient);
    +            }
    +            LOG.debug("Going to initiate Kerberos negotiations.");
    +            byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
    +            LOG.debug("Sending initial challenge: {}", initialChallenge);
    +            channel.write(new SaslMessageToken(initialChallenge));
    +        } catch (Exception e) {
    +            LOG.error("Failed to authenticate with server due to error: ",
    +                      e);
    +        }
    +        return;
    +
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
    +            throws Exception {
    +        LOG.debug("send/recv time (ms): {}",
    +                (System.currentTimeMillis() - start_time));
    +
    +        Channel channel = ctx.getChannel();
    +
    +        // Generate SASL response to server using Channel-local SASL client.
    +        KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
    +                .get(channel);
    +        if (saslNettyClient == null) {
    +            throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
    +        }
    +
    +        // examine the response message from server
    +        if (event.getMessage() instanceof ControlMessage) {
    +            ControlMessage msg = (ControlMessage) event.getMessage();
    +            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
    +                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +
    +                if (!saslNettyClient.isComplete()) {
    +                    String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
    +                    LOG.error(message);
    +                    throw new Exception(message);
    +                }
    +                ctx.getPipeline().remove(this);
    +                this.client.channelReady();
    +
    +                // We call fireMessageReceived since the client is allowed to
    +                // perform this request. The client's request will now proceed
    +                // to the next pipeline component namely StormClientHandler.
    +                Channels.fireMessageReceived(ctx, msg);
    +            }
    +            else {
    +                LOG.warn("Unexpected control message: {}", msg);
    +            }
    +            return;
    +        }
    +        else if (event.getMessage() instanceof SaslMessageToken) {
    +            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
    +                .getMessage();
    +            LOG.debug("Responding to server's token of length: {}",
    +                      saslTokenMessage.getSaslToken().length);
    +
    +            // Generate SASL response (but we only actually send the response if
    +            // it's non-null.
    +            byte[] responseToServer = saslNettyClient
    +                .saslResponse(saslTokenMessage);
    +            if (responseToServer == null) {
    +                // If we generate a null response, then authentication has completed
    +                // (if not, warn), and return without sending a response back to the
    +                // server.
    +                LOG.debug("Response to server is null: authentication should now be complete.");
    +                if (!saslNettyClient.isComplete()) {
    +                    LOG.warn("Generated a null response, but authentication is not complete.");
    +                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +                }
    +                this.client.channelReady();
    +                return;
    +            } else {
    +                LOG.debug("Response to server token has length: {}",
    +                          responseToServer.length);
    +            }
    +            // Construct a message containing the SASL response and send it to the
    +            // server.
    +            SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
    +            channel.write(saslResponse);
    +        }
    +        else {
    --- End diff --
    
    minor nit I thought the style guidelines had all of the else and else if clauses on the same line as the previous '}'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-158120792
  
    I'd like to wait for at least one more review and +1 before I merge, since both Bobby and Kishor worked with me on the design of the system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45646317
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    --- End diff --
    
    Removed (for some reason isn't showing up here.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45516042
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker;
    +
    +import backtype.storm.Config;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.messaging.netty.Client;
    +import backtype.storm.messaging.netty.ISaslClient;
    +import backtype.storm.messaging.netty.NettyRenameThreadFactory;
    +import backtype.storm.security.auth.AuthUtils;
    +import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
    +import java.net.InetSocketAddress;
    +import java.util.Map;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +import javax.security.auth.login.Configuration;
    +import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec;
    +import org.jboss.netty.bootstrap.ClientBootstrap;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelPipelineFactory;
    +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class PacemakerClient implements ISaslClient {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(PacemakerClient.class);
    +
    +    private String topo_name;
    +    private String secret;
    +    private boolean ready = false;
    +    private final ClientBootstrap bootstrap;
    +    private AtomicReference<Channel> channelRef;
    +    private AtomicBoolean closing;
    +    private InetSocketAddress remote_addr;
    +    private int maxPending = 100;
    +    private HBMessage messages[];
    +    private LinkedBlockingQueue<Integer> availableMessageSlots;
    +    private ThriftNettyClientCodec.AuthMethod authMethod;
    +
    +    private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
    +    private int retryTimes = 0;
    +    
    +    public PacemakerClient(Map config) {
    +
    +        String host = (String)config.get(Config.PACEMAKER_HOST);
    +        int port = (int)config.get(Config.PACEMAKER_PORT);
    +        topo_name = (String)config.get(Config.TOPOLOGY_NAME);
    +        if(topo_name == null) {
    +            topo_name = "pacemaker-client";
    +        }
    +
    +        String auth = (String)config.get(Config.PACEMAKER_AUTH_METHOD);
    +        ThriftNettyClientCodec.AuthMethod authMethod;
    +
    +        if(auth.equals("DIGEST")) {
    --- End diff --
    
    This series of `if ... else if` can be a `switch`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #838: [STORM-885] Heartbeat Server (Pacemaker)

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/838
  
    Now pacemaker is a stand-alone service and no HA is supported. When it goes down, all the workers's heartbeats will be lost. It will take a long time to recover even if pacemaker goes up immediately if there are dozens GB of heartbeats. During the time worker heartbeats are not restored completely, Nimbus will think these workers are dead because of heartbeats timeout and reassign these "dead" workers continuously until heartbeats restore to normal. So, during recovery time, many topologies will be reassigned continuously and the throughout will goes very down. 
    This is not acceptable. 
    So i think, pacemaker is not suitable for production if the problem above exists.
    i think several ways to solve this problem:
    1. pacemaker HA
    2. when pacemaker does down, notice nimbus not to reassign any more until it recover


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45511278
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -243,7 +243,7 @@ public void send(Iterator<TaskMessage> msgs) {
             if (closing) {
                 int numMessages = iteratorSize(msgs);
                 LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
    -                    dstAddressPrefixedName);
    +                      dstAddressPrefixedName);
    --- End diff --
    
    The only changes to this file are white-space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559934
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -292,7 +311,7 @@ private void dropMessages(Iterator<TaskMessage> msgs) {
             // We consume the iterator by traversing and thus "emptying" it.
             int msgCount = iteratorSize(msgs);
             messagesLost.getAndAdd(msgCount);
    -    }
    +                    }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-158120301
  
    @ptgoetz 
    I've added a couple of sections for that stuff to the bottom of the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559957
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -386,11 +405,10 @@ private void waitForPendingMessagesToBeSent() {
                 }
                 catch (InterruptedException e) {
                     break;
    -            }
             }
    -
         }
     
    +    }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43820769
  
    --- Diff: conf/defaults.yaml ---
    @@ -49,6 +49,8 @@ storm.auth.simple-white-list.users: []
     storm.auth.simple-acl.users: []
     storm.auth.simple-acl.users.commands: []
     storm.auth.simple-acl.admins: []
    +#storm.cluster.state.store: "org.apache.storm.pacemaker.pacemaker_state_factory"
    --- End diff --
    
    remove comment..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515488
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 60)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    +                   "Sent " get-count " heartbeats totaling " sent-size " bytes,\n"
    +                   "The largest heartbeat was " largest " bytes,\n"
    +                   "The average heartbeat was " average " bytes,\n"
    +                   "Pacemaker contained " total-keys " total keys\n"
    +                   "in the last " sleep-seconds " second(s)")
    +        (dosync (ref-set last-five-s
    +                         {:send-pulse-count send-count
    +                          :total-received-size received-size
    +                          :get-pulse-count get-count
    +                          :total-sent-size sent-size
    +                          :largest-heartbeat-size largest
    +                          :average-heartbeat-size average
    +                          :total-keys total-keys})))
    +      (Thread/sleep (* 1000 sleep-seconds))
    +      (recur)))
    +
    +;; JMX stuff
    +(defn register [last-five-s]
    +  (jmx/register-mbean
    +   (jmx/create-bean
    +    last-five-s)
    +   "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
    +
    +
    +;; Pacemaker Functions
    +
    +(defn hb-data [conf]
    +  (ConcurrentHashMap.))
    +
    +(defn create-path [^String path heartbeats]
    +  (HBMessage. HBServerMessageType/CREATE_PATH_RESPONSE nil))
    --- End diff --
    
    I'd rather leave the no-ops there as stubs for potential future expansion. Hopefully will help keep the structure of the program consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512711
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
    +
    +    ISaslServer server;
    +    /** Used for client or server's token to send or receive from each other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslServerHandler.class);
    +
    +    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
    +        this.server = server;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        this.authorizedUsers = authorizedUsers;
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    +        throws Exception {
    +        Object msg = e.getMessage();
    +        if (msg == null)
    +            return;
    +
    +        Channel channel = ctx.getChannel();
    +
    +
    +        if (msg instanceof SaslMessageToken) {
    +            // initialize server-side SASL functionality, if we haven't yet
    +            // (in which case we are looking at the first SASL message from the
    +            // client).
    +            try {
    +                LOG.debug("Got SaslMessageToken!");
    +
    +                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
    +                    .get(channel);
    +                if (saslNettyServer == null) {
    +                    LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
    +                    try {
    +                        saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
    +                    } catch (RuntimeException ioe) {
    +                        LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
    +                                  channel.getLocalAddress(), channel.getRemoteAddress());
    +                        saslNettyServer = null;
    +                    }
    +
    +                    KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
    +                                                                                saslNettyServer);
    +                } else {
    +                    LOG.debug("Found existing saslNettyServer on server: {} for client {}",
    +                              channel.getLocalAddress(), channel.getRemoteAddress());
    +                }
    +
    +                byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
    +                                                                .getSaslToken());
    --- End diff --
    
    If we catch the RTE above, we set `saslNettyServer = null` (even though it was already), and so at this line we would get a NullPointerException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by erikdw <gi...@git.apache.org>.
Github user erikdw commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45117066
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -778,6 +792,47 @@
         public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
     
         /**
    +     * The host that Pacemaker is running on.
    +     */
    +    @isString
    +    public static final String PACEMAKER_HOST = "pacemaker.host";
    +
    +    /**
    +     * The port Pacemaker should run on. Clients should
    +     * connect to this port to submit or read heartbeats.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_PORT = "pacemaker.port";
    +
    +    /**
    +     * The maximum number of threads that should be used by the Pacemaker.
    +     * When Pacemaker gets loaded it will spawn new threads, up to 
    +     * this many total, to handle the load.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
    +
    +    /**
    +     * This parameter is used by the storm-deploy project to configure the
    +     * jvm options for the nimbus daemon.
    --- End diff --
    
    I suppose @knusbaum means: you have a cargo culted string in your comment:  this config is for the *pacemaker* daemon, **not** the *nimbus* daemon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #838: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the issue:

    https://github.com/apache/storm/pull/838
  
    @revans2 It looks like the HA/load balancing stuff didn't get pushed back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45507626
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
    @@ -0,0 +1,124 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker-state-factory
    +  (:require [org.apache.storm.pacemaker pacemaker]
    +            [backtype.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
    +            [backtype.storm
    +             [config :refer :all]
    +             [cluster :refer :all]
    +             [log :refer :all]
    +             [util :as util]])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse]
    +           [backtype.storm.cluster_state zookeeper_state_factory]
    +           [backtype.storm.cluster ClusterState]
    +           [org.apache.storm.pacemaker PacemakerClient])
    +  (:gen-class
    +   :implements [backtype.storm.cluster.ClusterStateFactory]))
    --- End diff --
    
    Unused?
    * `HBNodes`
    * `HBRecords`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45642140
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -276,4 +313,26 @@ public static String get(Configuration configuration, String section, String key
             }
             return null;
         }
    +
    +    private static final String USERNAME = "username";
    +    private static final String PASSWORD = "password";
    +
    +    public static String makeDigestPayload(Configuration login_config, String config_section) {
    --- End diff --
    
    Fixed to generate SHA-512 digest of the username and password.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45513132
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java ---
    @@ -74,20 +84,32 @@ int encodeLength() {
          * 
          * @throws Exception
          */
    -    ChannelBuffer buffer() throws Exception {
    +    public ChannelBuffer buffer() throws IOException {
    --- End diff --
    
    update javadoc with new exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157862156
  
    For the record, the netty messaging layer changes are only to achieve 2 things that were necessary to make the heartbeats work.
     - Add Kerberos SASL plugin for the Netty pipeline
     - Facilitate generic serialization with INettySerializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by bastiliu <gi...@git.apache.org>.
Github user bastiliu commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-156904210
  
    @knusbaum It seems all workers in cluster will connect to pacemaker to update heartbeat. So, is it possible that pacemaker will become new bottleneck?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45514437
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerAuthorizeHandler.java ---
    @@ -80,4 +80,4 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     		// pipeline component.
     		Channels.fireMessageReceived(ctx, msg);
     	}
    -}
    \ No newline at end of file
    +}
    --- End diff --
    
    revert white-space only change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157887515
  
    I am +1. As part of full disclosure I didn't write any of this code, but I did talk with @knusbaum and tested the feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-158523479
  
    Some comments, mostly just code clean-up around new changes though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43952914
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Context.java ---
    @@ -55,6 +54,7 @@ public void prepare(Map storm_conf) {
             int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
     		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
             ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
    +        // TODO investigate impact of having one worker
    --- End diff --
    
    I don't think we need this. I'm pretty sure this is a merge error / regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157059718
  
    The code looks good to me and I am +1.  As part of full disclosure I didn't write any of this code, but I did talk with @knusbaum a few times about the design of it and the direction it should go it.  I would love to see more eyes on this too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r44302695
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,248 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +;; This is the old Thrift service that this server is emulating.
    --- End diff --
    
    I can equivalently just remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512076
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    --- End diff --
    
    unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45654094
  
    --- Diff: storm-core/src/clj/backtype/storm/util.clj ---
    @@ -1062,6 +1062,22 @@
     (defn hashmap-to-persistent [^HashMap m]
       (zipmap (.keySet m) (.values m)))
     
    +(defn retry-on-exception
    +  "Retries specific function on exception based on retries count"
    +  [retries task-description f & args]
    +  (let [res (try {:value (apply f args)}
    +              (catch Exception e
    +                (if (= 0 retries)
    --- End diff --
    
    Would like to check for `(not (pos?` or `<= 0` here in case we get a negative value for `retries`. We would get a LOT of log messages until in underflowed and came back to 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515221
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java ---
    @@ -75,7 +75,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
                                            + message + " (" + client.getDstAddress() + ")");
             }
         }
    -
    +        
    --- End diff --
    
    revert white-space only change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45505939
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    --- End diff --
    
    Unused?
    * `ThreadPoolExecutor`
    * `TimeUnit`
    * `LinkedBlockingQueue`
    * `Date`
    * `HBAuthorizationException`
    * `HBExecutionException`
    * `HBRecords`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-156172053
  
    @knusbaum could you please rebase thrift version changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45125877
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -778,6 +792,47 @@
         public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
     
         /**
    +     * The host that Pacemaker is running on.
    +     */
    +    @isString
    +    public static final String PACEMAKER_HOST = "pacemaker.host";
    +
    +    /**
    +     * The port Pacemaker should run on. Clients should
    +     * connect to this port to submit or read heartbeats.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_PORT = "pacemaker.port";
    +
    +    /**
    +     * The maximum number of threads that should be used by the Pacemaker.
    +     * When Pacemaker gets loaded it will spawn new threads, up to 
    +     * this many total, to handle the load.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
    +
    +    /**
    +     * This parameter is used by the storm-deploy project to configure the
    +     * jvm options for the nimbus daemon.
    --- End diff --
    
    @erikdw Yep. This was a note for myself. (this is my PR) :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515426
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -32,12 +32,18 @@
     import java.util.Collection;
     import java.util.Set;
     import java.util.HashSet;
    +import java.util.HashMap;
    --- End diff --
    
    Unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43940795
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Context.java ---
    @@ -55,6 +54,7 @@ public void prepare(Map storm_conf) {
             int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
     		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
             ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
    +        // TODO investigate impact of having one worker
    --- End diff --
    
    Do we need to investigate this? If so please either do the investigation.  either way please revert the changes to this file, there are no changes really worth making.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by redsanket <gi...@git.apache.org>.
Github user redsanket commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-155169071
  
    revert storm-core/src/genthrift.sh 100644 → 100755 permissions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by bastiliu <gi...@git.apache.org>.
Github user bastiliu commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157255819
  
    @revans2 One of the reasons why we added topology master in JStorm is to extend the hierarchy of the structrue of collection of HB & metrics to offload Zookeepr & Nimbus(from "workers->zookeeper->nimbus" to "workers->topology master->nimbus"). Besides it, the monitor mechanism of Nimbus is easily to bring the dead topology master up without adding new complex recovery functionality. Just went through these two solution again and thought about merging. I think they are not conflicted for merging. As you mentioned above,  pacemaker is designed as a pluggable component used to extend the capablity of nimbus for future. So, for merging, we can re-direct the reporting from "topology master->nimbus" to "topology master->pacemaker". That will make the scalability of processing of metrics better since the num of connections to pacemaker are reduced significantly (one connection for each topology).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512204
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import backtype.storm.security.auth.KerberosPrincipalToLocal;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosPrincipal;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslException;
    +import javax.security.sasl.SaslServer;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +class KerberosSaslNettyServer {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslNettyServer.class);
    +
    +    private SaslServer saslServer;
    +    private Subject subject;
    +    private String jaas_section;
    --- End diff --
    
    unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517771
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java ---
    @@ -0,0 +1,108 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker.codec;
    +
    +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channel;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.generated.HBMessageData;
    +import backtype.storm.generated.HBServerMessageType;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.messaging.netty.ControlMessage;
    +import backtype.storm.messaging.netty.SaslMessageToken;
    +import backtype.storm.messaging.netty.INettySerializable;
    +import java.io.IOException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.thrift.TBase;
    +
    +public class ThriftEncoder extends OneToOneEncoder {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(ThriftEncoder.class);
    +
    +    private HBMessage encodeNettySerializable(INettySerializable netty_message,
    +                                              HBServerMessageType mType) {
    +        
    +        HBMessageData message_data = new HBMessageData();
    +        HBMessage m = new HBMessage();
    +        try {
    +            ChannelBuffer cbuffer = netty_message.buffer();
    +            if(cbuffer.hasArray()) {
    +                message_data.set_message_blob(cbuffer.array());
    +            }
    +            else {
    +                byte buff[] = new byte[netty_message.encodeLength()];
    +                cbuffer.readBytes(buff, 0, netty_message.encodeLength());
    +                message_data.set_message_blob(buff);
    +            }
    +            m.set_type(mType);
    +            m.set_data(message_data);
    +            return m;
    +        }
    +        catch( IOException e) {
    +            LOG.error("Failed to encode NettySerializable: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
    +        if(msg == null) return null;
    +
    +        LOG.debug("Trying to encode: " + msg.getClass().toString() + " : " + msg.toString());
    +
    +        HBMessage m;
    +        if(msg instanceof INettySerializable) {
    +            INettySerializable nettyMsg = (INettySerializable)msg;
    +
    +            HBServerMessageType type;
    +            if(msg instanceof ControlMessage) {
    +                type = HBServerMessageType.CONTROL_MESSAGE;
    +            }
    +            else if(msg instanceof SaslMessageToken) {
    +                type = HBServerMessageType.SASL_MESSAGE_TOKEN;
    +            }
    +            else {
    +                LOG.error("Didn't recognise INettySerializable: " + nettyMsg.toString());
    +                throw new RuntimeException("Unrecognized INettySerializable.");
    +            }
    +            m = encodeNettySerializable(nettyMsg, type);
    +        }
    +        else {
    +            m = (HBMessage)msg;
    +        }
    +
    +        try {
    +            byte serialized[] = Utils.thriftSerialize((TBase)m);
    --- End diff --
    
    already a `TBase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45507464
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 60)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    +                   "Sent " get-count " heartbeats totaling " sent-size " bytes,\n"
    +                   "The largest heartbeat was " largest " bytes,\n"
    +                   "The average heartbeat was " average " bytes,\n"
    +                   "Pacemaker contained " total-keys " total keys\n"
    +                   "in the last " sleep-seconds " second(s)")
    +        (dosync (ref-set last-five-s
    +                         {:send-pulse-count send-count
    +                          :total-received-size received-size
    +                          :get-pulse-count get-count
    +                          :total-sent-size sent-size
    +                          :largest-heartbeat-size largest
    +                          :average-heartbeat-size average
    +                          :total-keys total-keys})))
    +      (Thread/sleep (* 1000 sleep-seconds))
    +      (recur)))
    +
    +;; JMX stuff
    +(defn register [last-five-s]
    +  (jmx/register-mbean
    +   (jmx/create-bean
    +    last-five-s)
    +   "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
    +
    +
    +;; Pacemaker Functions
    +
    +(defn hb-data [conf]
    +  (ConcurrentHashMap.))
    +
    +(defn create-path [^String path heartbeats]
    +  (HBMessage. HBServerMessageType/CREATE_PATH_RESPONSE nil))
    --- End diff --
    
    Current implementation is a no-op.  The parameters do make sense if we were to change implementation in the future.  We could leave these here or remove them.  I am fine with it either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45506408
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 60)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    +                   "Sent " get-count " heartbeats totaling " sent-size " bytes,\n"
    +                   "The largest heartbeat was " largest " bytes,\n"
    +                   "The average heartbeat was " average " bytes,\n"
    +                   "Pacemaker contained " total-keys " total keys\n"
    +                   "in the last " sleep-seconds " second(s)")
    +        (dosync (ref-set last-five-s
    +                         {:send-pulse-count send-count
    +                          :total-received-size received-size
    +                          :get-pulse-count get-count
    +                          :total-sent-size sent-size
    +                          :largest-heartbeat-size largest
    +                          :average-heartbeat-size average
    +                          :total-keys total-keys})))
    +      (Thread/sleep (* 1000 sleep-seconds))
    +      (recur)))
    --- End diff --
    
    minor: indent of `let`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43941636
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java ---
    @@ -0,0 +1,212 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslClient;
    +import javax.security.sasl.SaslException;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implements SASL logic for storm worker client processes.
    + */
    +public class KerberosSaslNettyClient {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslNettyClient.class);
    +
    +    /**
    +     * Used to respond to server's counterpart, SaslServer with SASL tokens
    +     * represented as byte arrays.
    +     */
    +    private SaslClient saslClient;
    +    private Subject subject;
    +    private String jaas_section;
    +    
    +    /**
    +     * Create a KerberosSaslNettyClient for authentication with servers.
    +     */
    +    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
    +        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
    +                  SaslUtils.KERBEROS);
    +        
    +        LOG.info("Creating Kerberos Client.");
    +        
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
    +        
    +        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
    +        
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf); 
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Client failed to login in principal:" + ex, ex);
    +            throw new RuntimeException(ex);
    +        }
    +        
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
    +            LOG.error("Failed to verify user principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \"" +
    +                                       jaas_section +
    +                                       "\" in login configuration file " +
    +                                       login_conf);
    +        }
    +
    +        String serviceName = null;
    +        try {
    +            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
    +        }
    +        catch (IOException e) {
    +            LOG.error("Failed to get service name.", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        try {
    +            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
    +            final String fPrincipalName = principal.getName();
    +            KerberosName kerbName = new KerberosName(principal.getName());
    +            final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
    +            final String fServiceName = serviceName;
    +            final CallbackHandler fch = ch;
    +            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
    +            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
    +                    public SaslClient run() {
    +                        try {
    +                            Map<String, String> props = new TreeMap<String,String>();
    +                            props.put(Sasl.QOP, "auth");
    +                            props.put(Sasl.SERVER_AUTH, "false");
    +                            return Sasl.createSaslClient(
    +                                new String[] { SaslUtils.KERBEROS },
    +                                fPrincipalName,
    +                                fServiceName,
    +                                fHost,
    +                                props, fch);
    +                        }
    +                        catch (Exception e) {
    +                            LOG.error("Subject failed to create sasl client.", e);
    +                            return null;
    +                        }
    +                    }
    +                });
    +            LOG.info("Got Client: {}", saslClient);
    +            
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
    +            throw new RuntimeException(e);
    +        }
    +}
    --- End diff --
    
    This indentation appears to be off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-154474226
  
    @knusbaum could you please rebase your patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45646121
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
    @@ -0,0 +1,124 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker-state-factory
    +  (:require [org.apache.storm.pacemaker pacemaker]
    +            [backtype.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
    +            [backtype.storm
    +             [config :refer :all]
    +             [cluster :refer :all]
    +             [log :refer :all]
    +             [util :as util]])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse]
    +           [backtype.storm.cluster_state zookeeper_state_factory]
    +           [backtype.storm.cluster ClusterState]
    +           [org.apache.storm.pacemaker PacemakerClient])
    +  (:gen-class
    +   :implements [backtype.storm.cluster.ClusterStateFactory]))
    --- End diff --
    
    Removed (for some reason isn't showing up here.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-152785729
  
    @knusbaum Seems like wrong issue is associated. STORM-885 would be proper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512472
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
    +
    +    ISaslServer server;
    +    /** Used for client or server's token to send or receive from each other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslServerHandler.class);
    +
    +    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
    +        this.server = server;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        this.authorizedUsers = authorizedUsers;
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    +        throws Exception {
    +        Object msg = e.getMessage();
    +        if (msg == null)
    +            return;
    --- End diff --
    
    minor: add braces to the `if`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45510536
  
    --- Diff: storm-core/src/jvm/backtype/storm/cluster/ClusterState.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.cluster;
    +
    +import clojure.lang.APersistentMap;
    +import clojure.lang.IFn;
    +import java.util.List;
    +import org.apache.zookeeper.data.ACL;
    +
    +/**
    + * ClusterState provides the API for the pluggable state store used by the
    + * Storm daemons. Data is stored in path/value format, and the store supports
    + * listing sub-paths at a given path.
    + * All data should be available across all nodes with eventual consistency.
    + *
    + * IMPORTANT NOTE:
    + * Heartbeats have different api calls used to interact with them. The root
    + * path (/) may or may not be the same as the root path for the other api calls.
    + *
    + * For example, performing these two calls:
    + *     set_data("/path", data, acls);
    + *     void set_worker_hb("/path", heartbeat, acls);
    + * may or may not cause a collision in "/path".
    + * Never use the same paths with the *_hb* methods as you do with the others.
    + */
    +public interface ClusterState {
    +
    +    /**
    +     * Registers a callback function that gets called when CuratorEvents happen.
    +     * @param callback is a clojure IFn that accepts the type - translated to
    +     * clojure keyword as in zookeeper.clj - and the path: (callback type path)
    +     * @return is an id that can be passed to unregister(...) to unregister the
    +     * callback.
    +     */
    +    String register(IFn callback);
    +
    +    /**
    +     * Unregisters a callback function that was registered with register(...).
    +     * @param id is the String id that was returned from register(...).
    +     */
    +    void unregister(String id);
    +
    +    /**
    +     * Path will be appended with a monotonically increasing integer, a new node
    +     * will be created there, and data will be put at that node.
    +     * @param path The path that the monotonically increasing integer suffix will
    +     * be added to.
    +     * @param data The data that will be written at the suffixed path's node.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return The path with the integer suffix appended.
    +     */
    +    String create_sequential(String path, byte[] data, List<ACL> acls);
    +
    +    /**
    +     * Creates nodes for path and all its parents. Path elements are separated by
    +     * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
    +     * @param path The path to create, along with all its parents.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return path
    +     */
    +    String mkdirs(String path, List<ACL> acls);
    +
    +    /**
    +     * Deletes the node at a given path, and any child nodes that may exist.
    +     * @param path The path to delete
    +     */
    +    void delete_node(String path);
    +
    +    /**
    +     * Creates an ephemeral node at path. Ephemeral nodes are destroyed
    +     * by the store when the client disconnects.
    +     * @param path The path where a node will be created.
    +     * @param data The data to be written at the node.
    +     * @param acls The acls to apply to the path. May be null.
    +     * @return path
    +     */
    +    void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
    +
    +    /**
    +     * Gets the 'version' of the node at a path. Optionally sets a watch
    +     * on that node. The version should increase whenever a write happens.
    +     * @param path The path to get the version of.
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return The integer version of this node.
    +     */
    +    Integer get_version(String path, boolean watch);
    +
    +    /**
    +     * Check if a node exists and optionally set a watch on the path.
    +     * @param path The path to check for the existence of a node.
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return Whether or not a node exists at path.
    +     */
    +    boolean node_exists(String path, boolean watch);
    +
    +    /**
    +     * Get a list of paths of all the child nodes which exist immediately
    +     * under path.
    +     * @param path The path to look under
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return list of string paths under path.
    +     */
    +    List<String> get_children(String path, boolean watch);
    +
    +    /**
    +     * Close the connection to the data store.
    +     */
    +    void close();
    +
    +    /**
    +     * Set the value of the node at path to data.
    +     * @param path The path whose node we want to set.
    +     * @param data The data to put in the node.
    +     * @param acls The acls to apply to the path. May be null.
    +     */
    +    void set_data(String path, byte[] data, List<ACL> acls);
    +
    +    /**
    +     * Get the data from the node at path
    +     * @param path The path to look under
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return The data at the node.
    +     */
    +    byte[] get_data(String path, boolean watch);
    +
    +    /**
    +     * Get the data at the node along with its version. Data is returned
    +     * in an APersistentMap with clojure keyword keys :data and :version.
    +     * @param path The path to look under
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return An APersistentMap in the form {:data data :version version}
    +     */
    +    APersistentMap get_data_with_version(String path, boolean watch);
    +
    +    /**
    +     * Write a worker heartbeat at the path.
    +     * @param path The path whose node we want to set.
    +     * @param data The data to put in the node.
    +     * @param acls The acls to apply to the path. May be null.
    +     */
    +    void set_worker_hb(String path, byte[] data, List<ACL> acls);
    +
    +    /**
    +     * Get the heartbeat from the node at path
    +     * @param path The path to look under
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return The heartbeat at the node.
    +     */
    +    byte[] get_worker_hb(String path, boolean watch);
    +
    +    /**
    +     * Get a list of paths of all the child nodes which exist immediately
    +     * under path. This is similar to get_children, but must be used for
    +     * any nodes
    +     * @param path The path to look under
    +     * @param watch Whether or not to set a watch on the path. Watched paths
    +     * emit events which are consumed by functions registered with the
    +     * register method. Very useful for catching updates to nodes.
    +     * @return list of string paths under path.
    +     */
    +    List<String> get_worker_hb_children(String path, boolean watch);
    +
    +    /**
    +     * Deletes the heartbeat at a given path, and any child nodes that may exist.
    +     * @param path The path to delete.
    +     */
    +    void delete_worker_hb(String path);
    +
    +    /**
    +     * Add a ClusterStateListener to the connection.
    +     * @param listener A ClusterStateListener to handle changing cluster state
    +     * events.
    +     */
    +    void add_listener(ClusterStateListener listener);
    +
    +    /**
    +     * Force consistency on a path. Any writes committed on the path before
    +     * this call will be completely propagated when it returns.
    +     * @param The path to synchronize.
    --- End diff --
    
    minor: `path to synchronize` (evidently the first word is the param name here)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45506483
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 60)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    +                   "Sent " get-count " heartbeats totaling " sent-size " bytes,\n"
    +                   "The largest heartbeat was " largest " bytes,\n"
    +                   "The average heartbeat was " average " bytes,\n"
    +                   "Pacemaker contained " total-keys " total keys\n"
    +                   "in the last " sleep-seconds " second(s)")
    +        (dosync (ref-set last-five-s
    +                         {:send-pulse-count send-count
    +                          :total-received-size received-size
    +                          :get-pulse-count get-count
    +                          :total-sent-size sent-size
    +                          :largest-heartbeat-size largest
    +                          :average-heartbeat-size average
    +                          :total-keys total-keys})))
    +      (Thread/sleep (* 1000 sleep-seconds))
    +      (recur)))
    +
    +;; JMX stuff
    +(defn register [last-five-s]
    +  (jmx/register-mbean
    +   (jmx/create-bean
    +    last-five-s)
    --- End diff --
    
    minor: one-space indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512296
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java ---
    @@ -0,0 +1,223 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import backtype.storm.security.auth.KerberosPrincipalToLocal;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosPrincipal;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.AuthorizeCallback;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslException;
    +import javax.security.sasl.SaslServer;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +class KerberosSaslNettyServer {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslNettyServer.class);
    +
    +    private SaslServer saslServer;
    +    private Subject subject;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
    +        this.authorizedUsers = authorizedUsers;
    +        LOG.debug("Getting Configuration.");
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +
    +        LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
    +
    +        KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(storm_conf, authorizedUsers);
    +
    +        //login our principal
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf);
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Server failed to login in principal:", ex);
    +            throw new RuntimeException(ex);
    +        }
    +
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
    +            LOG.error("Failed to verifyuser principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \""
    +                                       + jaas_section
    +                                       + "\" in login configuration file "
    +                                       + login_conf);
    +        }
    +
    +        try {
    +            LOG.info("Creating Kerberos Server.");
    +            final CallbackHandler fch = ch;
    +            Principal p = (Principal)subject.getPrincipals().toArray()[0];
    +            KerberosName kName = new KerberosName(p.getName());
    +            final String fHost = kName.getHostName();
    +            final String fServiceName = kName.getServiceName();
    +            LOG.debug("Server with host: {}", fHost);
    +            saslServer =
    +                Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
    +                        public SaslServer run() {
    +                            try {
    +                                Map<String, String> props = new TreeMap<String,String>();
    +                                props.put(Sasl.QOP, "auth");
    +                                props.put(Sasl.SERVER_AUTH, "false");
    +                                return Sasl.createSaslServer(SaslUtils.KERBEROS,
    +                                                             fServiceName,
    +                                                             fHost, props, fch);
    +                            }
    +                            catch (Exception e) {
    +                                LOG.error("Subject failed to create sasl server.", e);
    +                                return null;
    +                            }
    +                        }
    +                    });
    +            LOG.info("Got Server: {}", saslServer);
    +
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    public boolean isComplete() {
    +        return saslServer.isComplete();
    +    }
    +
    +    public String getUserName() {
    +        return saslServer.getAuthorizationID();
    +    }
    +
    +    private String getPrincipal(Subject subject) {
    +        Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
    +        if (principals==null || principals.size()<1) {
    +            LOG.info("No principal found in login subject");
    +            return null;
    +        }
    +        return ((Principal)(principals.toArray()[0])).getName();
    +    }
    --- End diff --
    
    This private method does not appear to be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45510058
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -778,6 +792,47 @@
         public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
     
         /**
    +     * The host that Pacemaker is running on.
    +     */
    +    @isString
    +    public static final String PACEMAKER_HOST = "pacemaker.host";
    +
    +    /**
    +     * The port Pacemaker should run on. Clients should
    +     * connect to this port to submit or read heartbeats.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_PORT = "pacemaker.port";
    +
    +    /**
    +     * The maximum number of threads that should be used by the Pacemaker.
    +     * When Pacemaker gets loaded it will spawn new threads, up to 
    +     * this many total, to handle the load.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
    +
    +    /**
    +     * This parameter is used by the storm-deploy project to configure the
    +     * jvm options for the pacemaker daemon.
    +     */
    +    @isString
    +    public static final String PACEMAKER_CHILDOPTS = "pacemaker.childopts";
    +
    +    /**
    +     * This should be one of "DIGEST", "KERBEROS", or "NONE"
    +     * Determines the mode of authentication the pacemaker server and client use.
    +     * The client must either match the server, or be NONE. In the case of NONE,
    +     * no authentication is performed for the client, and if the server is running with
    +     * DIGEST or KERBEROS, the client can only write to the server (no reads).
    +     * This is intended to provide a primitive form of access-control.
    +     */
    +    @isString
    +    public static final String PACEMAKER_AUTH_METHOD = "pacemaker.auth.method";
    --- End diff --
    
    It would be nice to have a validator that explicitly tests the value beyond its type, but we have other configs that do the same thing.  If it becomes an issue, we can create a validator as part of another jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-155225503
  
    @revans2  I'll wait to rebase until It's ready to merge. Others might want to see the comment history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-157914621
  
    @knusbaum Nice documentation! Thank you.
    
    Some thoughts on documentation improvements (please correct me if I'm wrong):
    
    1. Since pacemaker is a single node service, what happens in the event it goes down or gets separated by a network partition?
    2. How does it compare to the default zookeeper-backed implementation (performance, fault tolerance, how requirements, etc.).
    
    Documentation can make or break adoption of a new feature/API/etc.
    
    I'm +1 for merging this with the current documentation. +2 if the points above are addresses or a follow-up JIRA is created.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45513302
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java ---
    @@ -74,20 +84,32 @@ int encodeLength() {
          * 
          * @throws Exception
          */
    -    ChannelBuffer buffer() throws Exception {
    +    public ChannelBuffer buffer() throws IOException {
             ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
                     ChannelBuffers.directBuffer(encodeLength()));
    -        short identifier = -500;
             int payload_len = 0;
             if (token != null)
                 payload_len = token.length;
     
    -        bout.writeShort(identifier);
    -        bout.writeInt(payload_len);
    +        bout.writeShort(IDENTIFIER);
    +        bout.writeInt((int) payload_len);
    +
             if (payload_len > 0) {
                 bout.write(token);
             }
             bout.close();
             return bout.buffer();
         }
    +    
    +    public static SaslMessageToken read(byte[] serial) {
    +        ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
    +        short identifier = sm_buffer.readShort();
    +        int payload_len = sm_buffer.readInt();
    +        if(identifier != -500) {
    --- End diff --
    
    Can we use the new `IDENTIFIER` constant here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45513199
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java ---
    @@ -74,20 +84,32 @@ int encodeLength() {
          * 
          * @throws Exception
          */
    -    ChannelBuffer buffer() throws Exception {
    +    public ChannelBuffer buffer() throws IOException {
             ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
                     ChannelBuffers.directBuffer(encodeLength()));
    -        short identifier = -500;
             int payload_len = 0;
             if (token != null)
                 payload_len = token.length;
     
    -        bout.writeShort(identifier);
    -        bout.writeInt(payload_len);
    +        bout.writeShort(IDENTIFIER);
    +        bout.writeInt((int) payload_len);
    --- End diff --
    
    `(int)` cast not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43952746
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java ---
    @@ -0,0 +1,212 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslClient;
    +import javax.security.sasl.SaslException;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implements SASL logic for storm worker client processes.
    + */
    +public class KerberosSaslNettyClient {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslNettyClient.class);
    +
    +    /**
    +     * Used to respond to server's counterpart, SaslServer with SASL tokens
    +     * represented as byte arrays.
    +     */
    +    private SaslClient saslClient;
    +    private Subject subject;
    +    private String jaas_section;
    +    
    +    /**
    +     * Create a KerberosSaslNettyClient for authentication with servers.
    +     */
    +    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
    +        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
    +                  SaslUtils.KERBEROS);
    +        
    +        LOG.info("Creating Kerberos Client.");
    +        
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
    +        
    +        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
    +        
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf); 
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Client failed to login in principal:" + ex, ex);
    +            throw new RuntimeException(ex);
    +        }
    +        
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
    +            LOG.error("Failed to verify user principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \"" +
    +                                       jaas_section +
    +                                       "\" in login configuration file " +
    +                                       login_conf);
    +        }
    +
    +        String serviceName = null;
    +        try {
    +            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
    +        }
    +        catch (IOException e) {
    +            LOG.error("Failed to get service name.", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        try {
    +            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
    +            final String fPrincipalName = principal.getName();
    +            KerberosName kerbName = new KerberosName(principal.getName());
    +            final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
    +            final String fServiceName = serviceName;
    +            final CallbackHandler fch = ch;
    +            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
    +            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
    +                    public SaslClient run() {
    +                        try {
    +                            Map<String, String> props = new TreeMap<String,String>();
    +                            props.put(Sasl.QOP, "auth");
    +                            props.put(Sasl.SERVER_AUTH, "false");
    +                            return Sasl.createSaslClient(
    +                                new String[] { SaslUtils.KERBEROS },
    +                                fPrincipalName,
    +                                fServiceName,
    +                                fHost,
    +                                props, fch);
    +                        }
    +                        catch (Exception e) {
    +                            LOG.error("Subject failed to create sasl client.", e);
    +                            return null;
    +                        }
    +                    }
    +                });
    +            LOG.info("Got Client: {}", saslClient);
    +            
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
    +            throw new RuntimeException(e);
    +        }
    +}
    +
    +    public boolean isComplete() {
    +        return saslClient.isComplete();
    +    }
    +
    +    /**
    +     * Respond to server's SASL token.
    +     * 
    +     * @param saslTokenMessage
    +     *            contains server's SASL token
    +     * @return client's response SASL token
    +     */
    +    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
    +        try {
    +            final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
    +            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
    +                    public byte[] run() {
    +                        try {
    +                            byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
    +                                                                         .getSaslToken());
    +                            return retval;
    +                        } catch (SaslException e) {
    +                            LOG.error("saslResponse: Failed to respond to SASL server's token:",
    +                                      e);
    +                            throw new RuntimeException(e);
    +                        }
    +                    }
    +                });
    +            return retval;
    +        }
    +        catch (PrivilegedActionException e) {
    +            LOG.error("Failed to generate response for token: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Implementation of javax.security.auth.callback.CallbackHandler that works
    +     * with Storm topology tokens.
    +     */
    +    private static class SaslClientCallbackHandler implements CallbackHandler {
    +
    +        /**
    +         * Set private members using topology token.
    +         * 
    +         * @param topologyToken
    +         */
    +        public SaslClientCallbackHandler() {
    +        }
    --- End diff --
    
    I can probably just remove the empty constructor, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512856
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
    +
    +    ISaslServer server;
    +    /** Used for client or server's token to send or receive from each other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslServerHandler.class);
    +
    +    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
    +        this.server = server;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        this.authorizedUsers = authorizedUsers;
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    +        throws Exception {
    +        Object msg = e.getMessage();
    +        if (msg == null)
    +            return;
    +
    +        Channel channel = ctx.getChannel();
    +
    +
    +        if (msg instanceof SaslMessageToken) {
    +            // initialize server-side SASL functionality, if we haven't yet
    +            // (in which case we are looking at the first SASL message from the
    +            // client).
    +            try {
    +                LOG.debug("Got SaslMessageToken!");
    +
    +                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
    +                    .get(channel);
    +                if (saslNettyServer == null) {
    +                    LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
    +                    try {
    +                        saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
    +                    } catch (RuntimeException ioe) {
    +                        LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
    +                                  channel.getLocalAddress(), channel.getRemoteAddress());
    +                        saslNettyServer = null;
    +                    }
    +
    +                    KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
    +                                                                                saslNettyServer);
    +                } else {
    +                    LOG.debug("Found existing saslNettyServer on server: {} for client {}",
    +                              channel.getLocalAddress(), channel.getRemoteAddress());
    +                }
    +
    +                byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
    +                                                                .getSaslToken());
    +
    +                SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
    +
    +                if(saslTokenMessageRequest.getSaslToken() == null) {
    +                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
    +                } else {
    +                    // Send response to client.
    +                    channel.write(saslTokenMessageRequest);
    +                }
    +
    +                if (saslNettyServer.isComplete()) {
    +                    // If authentication of client is complete, we will also send a
    +                    // SASL-Complete message to the client.
    +                    LOG.info("SASL authentication is complete for client with username: {}",
    +                             saslNettyServer.getUserName());
    +                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
    +                    LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
    +                    ctx.getPipeline().remove(this);
    +                    server.authenticated(channel);
    +                }
    +                return;
    +            }
    +            catch (Exception ex) {
    +                LOG.error("Failed to handle SaslMessageToken: ", ex);
    +                throw ex;
    +            }
    +        } else {
    +            // Client should not be sending other-than-SASL messages before
    +            // SaslServerHandler has removed itself from the pipeline. Such
    +            // non-SASL requests will be denied by the Authorize channel handler
    +            // (the next handler upstream in the server pipeline) if SASL
    +            // authentication has not completed.
    +            LOG.warn("Sending upstream an unexpected non-SASL message : {}",
    +                     msg);
    +            Channels.fireMessageReceived(ctx, msg);
    +        }
    +    }
    +
    +    @Override
    +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    +        if(server != null) server.closeChannel(e.getChannel());
    --- End diff --
    
    minor: braces on `if`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517616
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java ---
    @@ -0,0 +1,77 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker.codec;
    +
    +import org.jboss.netty.handler.codec.frame.FrameDecoder;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channel;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.generated.HBMessageData;
    +import backtype.storm.generated.HBServerMessageType;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.messaging.netty.ControlMessage;
    +import backtype.storm.messaging.netty.SaslMessageToken;
    +
    +public class ThriftDecoder extends FrameDecoder {
    +
    +    @Override
    +    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
    +
    +        long available = buf.readableBytes();
    +        if(available < 2) {
    +            return null;
    +        }
    +
    +        buf.markReaderIndex();
    +
    +        int thriftLen = buf.readInt();
    +        available -= 4;
    +
    +        if(available < thriftLen) {
    +            // We haven't received the entire object yet, return and wait for more bytes.
    +            buf.resetReaderIndex();
    +            return null;
    +        }
    +
    +        buf.discardReadBytes();
    +
    +        HBMessage m;
    +        if(buf.hasArray()) {
    +            m = (HBMessage)Utils.thriftDeserialize(HBMessage.class, buf.array(), 0, thriftLen);
    --- End diff --
    
    here and below: thriftDeserialize is generic so it doesn't need the cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45511827
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java ---
    @@ -0,0 +1,204 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslClient;
    +import javax.security.sasl.SaslException;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implements SASL logic for storm worker client processes.
    + */
    +public class KerberosSaslNettyClient {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslNettyClient.class);
    +
    +    /**
    +     * Used to respond to server's counterpart, SaslServer with SASL tokens
    +     * represented as byte arrays.
    +     */
    +    private SaslClient saslClient;
    +    private Subject subject;
    +    private String jaas_section;
    +
    +    /**
    +     * Create a KerberosSaslNettyClient for authentication with servers.
    +     */
    +    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
    +        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
    +                  SaslUtils.KERBEROS);
    +
    +        LOG.info("Creating Kerberos Client.");
    +
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
    +
    +        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
    +
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf);
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Client failed to login in principal:" + ex, ex);
    +            throw new RuntimeException(ex);
    +        }
    +
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
    +            LOG.error("Failed to verify user principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \"" +
    +                                       jaas_section +
    +                                       "\" in login configuration file " +
    +                                       login_conf);
    +        }
    +
    +        String serviceName = null;
    +        try {
    +            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
    +        }
    +        catch (IOException e) {
    +            LOG.error("Failed to get service name.", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        try {
    +            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
    +            final String fPrincipalName = principal.getName();
    +            KerberosName kerbName = new KerberosName(principal.getName());
    --- End diff --
    
    `kerbName` unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45518313
  
    --- Diff: storm-core/test/clj/org/apache/storm/pacemaker_test.clj ---
    @@ -0,0 +1,242 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +(ns org.apache.storm.pacemaker-test
    +  (:require [clojure.test :refer :all]
    +            [org.apache.storm.pacemaker [pacemaker :as pacemaker]]
    +            [conjure.core :as conjure])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    --- End diff --
    
    These three unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515312
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java ---
    @@ -25,33 +26,39 @@
     import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import java.util.List;
     import java.util.concurrent.atomic.AtomicInteger;
     
    -class StormServerHandler extends SimpleChannelUpstreamHandler  {
    +public class StormServerHandler extends SimpleChannelUpstreamHandler  {
         private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
    -    Server server;
    +    IServer server;
         private AtomicInteger failure_count; 
    +    private Channel channel;
         
    -    StormServerHandler(Server server) {
    +    public StormServerHandler(IServer server) {
             this.server = server;
             failure_count = new AtomicInteger(0);
         }
         
         @Override
         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    -        server.addChannel(e.getChannel());
    +        server.channelConnected(e.getChannel());
    +        if(channel != null) {
    +            LOG.debug("Replacing channel with new channel: "
    +                      + channel.toString() + " -> " + e.getChannel().toString());
    --- End diff --
    
    Use format string for log message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559892
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -266,7 +281,7 @@ public void send(Iterator<TaskMessage> msgs) {
                 // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
                 // because we know `Channel.isWritable` was false after the messages were already in the buffer.
             }
    -    }
    --- End diff --
    
    Spacing



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515115
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Server.java ---
    @@ -17,6 +17,21 @@
      */
     package backtype.storm.messaging.netty;
     
    +import backtype.storm.Config;
    +import backtype.storm.grouping.Load;
    +import backtype.storm.messaging.TaskMessage;
    --- End diff --
    
    `TaskMessage` is imported again further down.  There are quite a few unused imports too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43940262
  
    --- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
    @@ -752,6 +759,54 @@
         public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth";
     
         /**
    +     * The host that Pacemaker is running on.
    +     */
    +    @isString
    +    public static final String PACEMAKER_HOST = "pacemaker.host";
    +
    +    /**
    +     * The port Pacemaker should run on. Clients should
    +     * connect to this port to submit or read heartbeats.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_PORT = "pacemaker.port";
    +
    +    /**
    +     * The maximum number of threads that should be used by the Pacemaker.
    +     * When Pacemaker gets loaded it will spawn new threads, up to 
    +     * this many total, to handle the load.
    +     */
    +    @isNumber
    +    @isPositiveNumber
    +    public static final String PACEMAKER_MAX_THREADS = "pacemaker.max.threads";
    +
    +    /**
    +     * This parameter is used by the storm-deploy project to configure the
    +     * jvm options for the nimbus daemon.
    +     */
    +    @isString
    +    public static final String PACEMAKER_CHILDOPTS = "pacemaker.childopts";
    +
    +    /**
    +     * This should be one of "DIGEST", "KERBEROS", or "NONE"
    +     * Determines the mode of authentication the pacemaker server and client use.
    +     * The client must either match the server, or be NONE. In the case of NONE,
    +     * no authentication is performed for the client, and if the server is running with
    +     * DIGEST or KERBEROS, the client can only write to the server (no reads).
    +     * This is intended to provide a primitive form of access-control.
    +     */
    +    @isString
    +    public static final String PACEMAKER_AUTH_METHOD = "pacemaker.auth.method";
    +
    +    /**
    +     * These are the kerberos users who are authorized to read hearbeats from
    +     * Pacemaker.
    +     */
    +    @isStringList
    +    public static final String PACEMAKER_KERBEROS_USERS = "pacemaker.kerberos.users";
    --- End diff --
    
    Could we please rename this?  We already have configs that indicate who the supervisors are running as, and who admins are,  I really would prefer to see a config that indicates who nimbus is running as so it can be shared with other things in the future, if we ever need them.
    
    I would also like to see the authentication code refactored to look more like what the regular thrift auth does with the request context and a pluggable authorization mechanism.  I don't see a reason to do it here, if the code already works, but I would like to see it in a follow on JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45516340
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -276,4 +313,26 @@ public static String get(Configuration configuration, String section, String key
             }
             return null;
         }
    +
    +    private static final String USERNAME = "username";
    +    private static final String PASSWORD = "password";
    +
    +    public static String makeDigestPayload(Configuration login_config, String config_section) {
    --- End diff --
    
    Technically this generates the secret payload, not the digest, but that is really minor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45504914
  
    --- Diff: storm-core/src/clj/backtype/storm/util.clj ---
    @@ -1062,6 +1062,22 @@
     (defn hashmap-to-persistent [^HashMap m]
       (zipmap (.keySet m) (.values m)))
     
    +(defn retry-on-exception
    +  "Retries specific function on exception based on retries count"
    +  [tries task-description f & args]
    +  (let [res (try {:value (apply f args)}
    +              (catch Exception e
    +                (if (= 0 tries)
    +                  (throw e)
    +                  {:exception e})))]
    +    (if (:exception res)
    +      (do 
    +        (log-error (:exception res) (str "Failed to " task-description ". Will make [" tries "] more attempts."))
    +        (recur (dec tries) task-description f args))
    +      (do 
    +        (log-debug (str "Successful " task-description "."))
    +        (:value res)))))
    --- End diff --
    
    Let's name the var `retries` instead of `tries`, since it calls the function `tries + 1` times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-156498367
  
    I'm +1 for adding this functionality, but I would say that vote is non-binding since this is new code/functionality I'm not that familiar with yet. The code looks good to me, but I would defer to those more familiar with it to determine if it's ready.
    
    I would like to see more documentation. Specifically:
    * Rationale
    * Benefits
    * Configuration/Usage guidelines
    
    I'm okay with handling that as a separate JIRA, but I think it would be needed before we include this in a release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43940458
  
    --- Diff: storm-core/src/jvm/backtype/storm/cluster/ClusterState.java ---
    @@ -0,0 +1,45 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.cluster;
    +
    +import clojure.lang.APersistentMap;
    +import clojure.lang.IFn;
    +import java.util.List;
    +import org.apache.zookeeper.data.ACL;
    +
    +public interface ClusterState {
    --- End diff --
    
    minor nit. it wold be nice to have a javadoc comment here and on all of the methods.  The clojure code didn't have anything so if you want to push back It is not that critical. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45513090
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java ---
    @@ -63,7 +72,8 @@ public void setSaslToken(byte[] token) {
             this.token = token;
         }
     
    -    int encodeLength() {
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45507897
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
    @@ -0,0 +1,124 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker-state-factory
    +  (:require [org.apache.storm.pacemaker pacemaker]
    +            [backtype.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
    +            [backtype.storm
    +             [config :refer :all]
    +             [cluster :refer :all]
    +             [log :refer :all]
    +             [util :as util]])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse]
    +           [backtype.storm.cluster_state zookeeper_state_factory]
    +           [backtype.storm.cluster ClusterState]
    +           [org.apache.storm.pacemaker PacemakerClient])
    +  (:gen-class
    +   :implements [backtype.storm.cluster.ClusterStateFactory]))
    +
    +;; So we can mock the client for testing
    +(defn makeClient [conf]
    +  (PacemakerClient. conf))
    +
    +(defn makeZKState [conf auth-conf acls context]
    +  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
    +
    +(def max-retries 10)
    +
    +(defn -mkState [this conf auth-conf acls context]
    +  (let [zk-state (makeZKState conf auth-conf acls context)
    +        pacemaker-client (makeClient conf)]
    +
    +    (reify
    +      ClusterState
    +      ;; Let these pass through to the zk-state. We only want to handle heartbeats.
    +      (register [this callback] (.register zk-state callback))
    +      (unregister [this callback] (.unregister zk-state callback))
    +      (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state path data acls))
    +      (create_sequential [this path data acls] (.create_sequential zk-state path data acls))
    +      (set_data [this path data acls] (.set_data zk-state path data acls))
    +      (delete_node [this path] (.delete_node zk-state path))
    +      (get_data [this path watch?] (.get_data zk-state path watch?))
    +      (get_data_with_version [this path watch?] (.get_data_with_version zk-state path watch?))
    +      (get_version [this path watch?] (.get_version zk-state path watch?))
    +      (get_children [this path watch?] (.get_children zk-state path watch?))
    +      (mkdirs [this path acls] (.mkdirs zk-state path acls))
    +      (node_exists [this path watch?] (.node_exists zk-state path watch?))
    +      (add_listener [this listener] (.add_listener zk-state listener))
    +      (sync_path [this path] (.sync_path zk-state path))
    +      
    +      (set_worker_hb [this path data acls]
    +        (util/retry-on-exception
    +         max-retries
    +         "set_worker_hb"
    +         #(let [response
    +                (.send pacemaker-client
    +                       (HBMessage. HBServerMessageType/SEND_PULSE
    +                                   (HBMessageData/pulse
    +                                    (doto (HBPulse.)
    +                                      (.set_id path)
    +                                      (.set_details data)))))]
    +            (if (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE)
    +              :ok
    +              (throw (HBExecutionException. "Invalid Response Type"))))))
    --- End diff --
    
    Do we need the `acls` param here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43802385
  
    --- Diff: storm-core/test/clj/org/apache/storm/pacemaker_test.clj ---
    @@ -0,0 +1,227 @@
    +(ns org.apache.storm.pacemaker-test
    +  (:require [clojure.test :refer :all]
    +            [org.apache.storm.pacemaker [pacemaker :as pacemaker]]
    +            [conjure.core :as conjure])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse]))
    +
    +(defn- message-with-rand-id [type data]
    +  (let [mid (rand-int 1000)
    +        message (HBMessage. type data)]
    +    (.set_message_id message mid)
    +    [message mid]))
    +
    +(defn- string-to-bytes [string]
    +  (byte-array (map int string)))
    +
    +(defn- bytes-to-string [bytez]
    +  (apply str (map char bytez)))
    +
    +(defn- makenode [handler path]
    +  (.handleMessage handler
    +                  (HBMessage.
    +                   HBServerMessageType/SEND_PULSE
    +                   (HBMessageData/pulse
    +                    (doto (HBPulse.)
    +                      (.set_id path)
    +                      (.set_details (string-to-bytes "nothing")))))
    +                  true))
    +
    +(deftest pacemaker-server-create-path
    +  (conjure/stubbing
    +   [pacemaker/register nil]
    +   (let [handler (pacemaker/mk-handler {})]
    +     (testing "CREATE_PATH"
    +       (let [[message mid] (message-with-rand-id
    +                            HBServerMessageType/CREATE_PATH
    +                            (HBMessageData/path "/testpath"))
    +             response (.handleMessage handler message true)]
    +         (is (= (.get_message_id response) mid))
    +         (is (= (.get_type response) HBServerMessageType/CREATE_PATH_RESPONSE))
    +         (is (= (.get_data response) nil)))))))
    +
    +(deftest pacemaker-server-exists
    +  (conjure/stubbing
    +   [pacemaker/register nil]
    +   (let [handler (pacemaker/mk-handler {})]
    +     (testing "EXISTS - false"
    +       (let [[message mid] (message-with-rand-id HBServerMessageType/EXISTS
    +                                                 (HBMessageData/path "/testpath"))
    +             bad-response (.handleMessage handler message false)
    +             good-response (.handleMessage handler message true)]
    +         (is (= (.get_message_id bad-response) mid))
    +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
    +
    +         (is (= (.get_message_id good-response) mid))
    +         (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
    +         (is (= (.get_boolval (.get_data good-response)) false))))
    +
    +     (testing "EXISTS - true"
    +       (let [path "/exists_path"
    +             data-string "pulse data"]
    +         (let [[send _] (message-with-rand-id
    +                         HBServerMessageType/SEND_PULSE
    +                         (HBMessageData/pulse
    +                          (doto (HBPulse.)
    +                            (.set_id path)
    +                            (.set_details (string-to-bytes data-string)))))
    +               _ (.handleMessage handler send true)
    +               [message mid] (message-with-rand-id HBServerMessageType/EXISTS
    +                                                   (HBMessageData/path path))
    +               bad-response (.handleMessage handler message false)
    +               good-response (.handleMessage handler message true)]
    +           (is (= (.get_message_id bad-response) mid))
    +          (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
    +
    +          (is (= (.get_message_id good-response) mid))
    +          (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
    +          (is (= (.get_boolval (.get_data good-response)) true))))))))
    --- End diff --
    
    Spacing for a bunch of these `is` forms is off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45506713
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,239 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +
    +;; Stats Functions
    +
    +(def sleep-seconds 60)
    +
    +
    +(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
    +                                            :or {compare (fn [new old] true)
    +                                                 new-fn (fn [new old] new)}}]
    +  (loop []
    +    (let [old (.get (key stats))
    +          new (new-fn new old)]
    +      (if (compare new old)
    +        (if (.compareAndSet (key stats) old new)
    +          nil
    +          (recur))
    +        nil))))
    +
    +(defn- set-average [stats size]
    +  (check-and-set-loop
    +   stats
    +   :average-heartbeat-size
    +   size
    +   :new-fn (fn [new old]
    +            (let [count (.get (:send-pulse-count stats))]
    +                                        ; Weighted average
    +              (/ (+ new (* count old)) (+ count 1))))))
    +
    +(defn- set-largest [stats size]
    +  (check-and-set-loop
    +   stats
    +   :largest-heartbeat-size
    +   size
    +   :compare #'>))
    +
    +(defn- report-stats [heartbeats stats last-five-s]
    +  (loop []
    +      (let [send-count (.getAndSet (:send-pulse-count stats) 0)
    +            received-size (.getAndSet (:total-received-size stats) 0)
    +            get-count (.getAndSet (:get-pulse-count stats) 0)
    +            sent-size (.getAndSet (:total-sent-size stats) 0)
    +            largest (.getAndSet (:largest-heartbeat-size stats) 0)
    +            average (.getAndSet (:average-heartbeat-size stats) 0)
    +            total-keys (.size heartbeats)]
    +        (log-debug "\nReceived " send-count " heartbeats totaling " received-size " bytes,\n"
    +                   "Sent " get-count " heartbeats totaling " sent-size " bytes,\n"
    +                   "The largest heartbeat was " largest " bytes,\n"
    +                   "The average heartbeat was " average " bytes,\n"
    +                   "Pacemaker contained " total-keys " total keys\n"
    +                   "in the last " sleep-seconds " second(s)")
    +        (dosync (ref-set last-five-s
    +                         {:send-pulse-count send-count
    +                          :total-received-size received-size
    +                          :get-pulse-count get-count
    +                          :total-sent-size sent-size
    +                          :largest-heartbeat-size largest
    +                          :average-heartbeat-size average
    +                          :total-keys total-keys})))
    +      (Thread/sleep (* 1000 sleep-seconds))
    +      (recur)))
    +
    +;; JMX stuff
    +(defn register [last-five-s]
    +  (jmx/register-mbean
    +   (jmx/create-bean
    +    last-five-s)
    +   "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
    +
    +
    +;; Pacemaker Functions
    +
    +(defn hb-data [conf]
    +  (ConcurrentHashMap.))
    --- End diff --
    
    remove `conf`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45513917
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java ---
    @@ -35,127 +35,124 @@
     
     class SaslNettyServer {
     
    -	private static final Logger LOG = LoggerFactory
    -			.getLogger(SaslNettyServer.class);
    -
    -	private SaslServer saslServer;
    -
    -	SaslNettyServer(String topologyName, byte[] token) throws IOException {
    -		LOG.debug("SaslNettyServer: Topology token is: " + topologyName
    -				+ " with authmethod " + SaslUtils.AUTH_DIGEST_MD5);
    -
    -		try {
    -
    -			SaslDigestCallbackHandler ch = new SaslNettyServer.SaslDigestCallbackHandler(
    -					topologyName, token);
    -
    -			saslServer = Sasl.createSaslServer(SaslUtils.AUTH_DIGEST_MD5, null,
    -					SaslUtils.DEFAULT_REALM, SaslUtils.getSaslProps(), ch);
    -
    -		} catch (SaslException e) {
    -			LOG.error("SaslNettyServer: Could not create SaslServer: " + e);
    -		}
    -
    -	}
    -
    -	public boolean isComplete() {
    -		return saslServer.isComplete();
    -	}
    -
    -	public String getUserName() {
    -		return saslServer.getAuthorizationID();
    -	}
    -
    -	/** CallbackHandler for SASL DIGEST-MD5 mechanism */
    -	public static class SaslDigestCallbackHandler implements CallbackHandler {
    -
    -		/** Used to authenticate the clients */
    -		private byte[] userPassword;
    -		private String userName;
    -
    -		public SaslDigestCallbackHandler(String topologyName, byte[] token) {
    -			LOG.debug("SaslDigestCallback: Creating SaslDigestCallback handler "
    -					+ "with topology token: " + topologyName);
    -			this.userName = topologyName;
    -			this.userPassword = token;
    -		}
    -
    -		@Override
    -		public void handle(Callback[] callbacks) throws IOException,
    -				UnsupportedCallbackException {
    -			NameCallback nc = null;
    -			PasswordCallback pc = null;
    -			AuthorizeCallback ac = null;
    -
    -			for (Callback callback : callbacks) {
    -				if (callback instanceof AuthorizeCallback) {
    -					ac = (AuthorizeCallback) callback;
    -				} else if (callback instanceof NameCallback) {
    -					nc = (NameCallback) callback;
    -				} else if (callback instanceof PasswordCallback) {
    -					pc = (PasswordCallback) callback;
    -				} else if (callback instanceof RealmCallback) {
    -					continue; // realm is ignored
    -				} else {
    -					throw new UnsupportedCallbackException(callback,
    -							"handle: Unrecognized SASL DIGEST-MD5 Callback");
    -				}
    -			}
    -
    -			if (nc != null) {
    -				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
    -						+ "username for client: " + userName);
    -
    -				nc.setName(userName);
    -			}
    -
    -			if (pc != null) {
    -				char[] password = SaslUtils.encodePassword(userPassword);
    -
    -				LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
    -						+ "password for client: " + userPassword);
    -
    -				pc.setPassword(password);
    -			}
    -			if (ac != null) {
    -
    -				String authid = ac.getAuthenticationID();
    -				String authzid = ac.getAuthorizationID();
    -
    -				if (authid.equals(authzid)) {
    -					ac.setAuthorized(true);
    -				} else {
    -					ac.setAuthorized(false);
    -				}
    -
    -				if (ac.isAuthorized()) {
    -					LOG.debug("handle: SASL server DIGEST-MD5 callback: setting "
    -							+ "canonicalized client ID: " + userName);
    -					ac.setAuthorizedID(authzid);
    -				}
    -			}
    -		}
    -	}
    -
    -	/**
    -	 * Used by SaslTokenMessage::processToken() to respond to server SASL
    -	 * tokens.
    -	 * 
    -	 * @param token
    -	 *            Server's SASL token
    -	 * @return token to send back to the server.
    -	 */
    -	public byte[] response(byte[] token) {
    -		try {
    -			LOG.debug("response: Responding to input token of length: "
    -					+ token.length);
    -			byte[] retval = saslServer.evaluateResponse(token);
    -			LOG.debug("response: Response token length: " + retval.length);
    -			return retval;
    -		} catch (SaslException e) {
    -			LOG.error("response: Failed to evaluate client token of length: "
    -					+ token.length + " : " + e);
    -			return null;
    -		}
    -	}
    -}
    \ No newline at end of file
    +
    --- End diff --
    
    extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45646200
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
    +
    +    ISaslServer server;
    +    /** Used for client or server's token to send or receive from each other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslServerHandler.class);
    +
    +    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
    +        this.server = server;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        this.authorizedUsers = authorizedUsers;
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    +        throws Exception {
    +        Object msg = e.getMessage();
    +        if (msg == null)
    +            return;
    +
    +        Channel channel = ctx.getChannel();
    +
    +
    +        if (msg instanceof SaslMessageToken) {
    +            // initialize server-side SASL functionality, if we haven't yet
    +            // (in which case we are looking at the first SASL message from the
    +            // client).
    +            try {
    +                LOG.debug("Got SaslMessageToken!");
    +
    +                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
    +                    .get(channel);
    +                if (saslNettyServer == null) {
    +                    LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
    +                    try {
    +                        saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
    +                    } catch (RuntimeException ioe) {
    +                        LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
    +                                  channel.getLocalAddress(), channel.getRemoteAddress());
    +                        saslNettyServer = null;
    +                    }
    +
    +                    KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
    +                                                                                saslNettyServer);
    +                } else {
    +                    LOG.debug("Found existing saslNettyServer on server: {} for client {}",
    +                              channel.getLocalAddress(), channel.getRemoteAddress());
    +                }
    +
    +                byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
    +                                                                .getSaslToken());
    --- End diff --
    
    Fixed above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43938663
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
    @@ -0,0 +1,248 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker
    +  (:import [org.apache.storm.pacemaker PacemakerServer IServerMessageHandler]
    +           [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor TimeUnit LinkedBlockingDeque]
    +           [java.util.concurrent.atomic AtomicInteger]
    +           [java.util Date]
    +           [backtype.storm.generated
    +            HBAuthorizationException HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse])
    +  (:use [clojure.string :only [replace-first split]]
    +        [backtype.storm log config util])
    +  (:require [clojure.java.jmx :as jmx])
    +  (:gen-class))
    +
    +;; This is the old Thrift service that this server is emulating.
    --- End diff --
    
    Its not really old.  And can you explain why you are emulating a thrift service instead of using thrift?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43942352
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java ---
    @@ -0,0 +1,212 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import backtype.storm.Config;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.io.IOException;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import javax.security.auth.Subject;
    +import javax.security.auth.callback.Callback;
    +import javax.security.auth.callback.CallbackHandler;
    +import javax.security.auth.callback.UnsupportedCallbackException;
    +import javax.security.auth.kerberos.KerberosTicket;
    +import javax.security.auth.login.Configuration;
    +import javax.security.auth.login.LoginException;
    +import javax.security.sasl.Sasl;
    +import javax.security.sasl.SaslClient;
    +import javax.security.sasl.SaslException;
    +import org.apache.zookeeper.Login;
    +import org.apache.zookeeper.server.auth.KerberosName;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implements SASL logic for storm worker client processes.
    + */
    +public class KerberosSaslNettyClient {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslNettyClient.class);
    +
    +    /**
    +     * Used to respond to server's counterpart, SaslServer with SASL tokens
    +     * represented as byte arrays.
    +     */
    +    private SaslClient saslClient;
    +    private Subject subject;
    +    private String jaas_section;
    +    
    +    /**
    +     * Create a KerberosSaslNettyClient for authentication with servers.
    +     */
    +    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
    +        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
    +                  SaslUtils.KERBEROS);
    +        
    +        LOG.info("Creating Kerberos Client.");
    +        
    +        Configuration login_conf;
    +        try {
    +            login_conf = AuthUtils.GetConfiguration(storm_conf);
    +        }
    +        catch (Throwable t) {
    +            LOG.error("Failed to get login_conf: ", t);
    +            throw t;
    +        }
    +        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
    +        
    +        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
    +        
    +        subject = null;
    +        try {
    +            LOG.debug("Setting Configuration to login_config: {}", login_conf);
    +            //specify a configuration object to be used
    +            Configuration.setConfiguration(login_conf); 
    +            //now login
    +            LOG.debug("Trying to login.");
    +            Login login = new Login(jaas_section, ch);
    +            subject = login.getSubject();
    +            LOG.debug("Got Subject: {}", subject.toString());
    +        } catch (LoginException ex) {
    +            LOG.error("Client failed to login in principal:" + ex, ex);
    +            throw new RuntimeException(ex);
    +        }
    +        
    +        //check the credential of our principal
    +        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
    +            LOG.error("Failed to verify user principal.");
    +            throw new RuntimeException("Fail to verify user principal with section \"" +
    +                                       jaas_section +
    +                                       "\" in login configuration file " +
    +                                       login_conf);
    +        }
    +
    +        String serviceName = null;
    +        try {
    +            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
    +        }
    +        catch (IOException e) {
    +            LOG.error("Failed to get service name.", e);
    +            throw new RuntimeException(e);
    +        }
    +
    +        try {
    +            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
    +            final String fPrincipalName = principal.getName();
    +            KerberosName kerbName = new KerberosName(principal.getName());
    +            final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
    +            final String fServiceName = serviceName;
    +            final CallbackHandler fch = ch;
    +            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
    +            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
    +                    public SaslClient run() {
    +                        try {
    +                            Map<String, String> props = new TreeMap<String,String>();
    +                            props.put(Sasl.QOP, "auth");
    +                            props.put(Sasl.SERVER_AUTH, "false");
    +                            return Sasl.createSaslClient(
    +                                new String[] { SaslUtils.KERBEROS },
    +                                fPrincipalName,
    +                                fServiceName,
    +                                fHost,
    +                                props, fch);
    +                        }
    +                        catch (Exception e) {
    +                            LOG.error("Subject failed to create sasl client.", e);
    +                            return null;
    +                        }
    +                    }
    +                });
    +            LOG.info("Got Client: {}", saslClient);
    +            
    +        } catch (PrivilegedActionException e) {
    +            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
    +            throw new RuntimeException(e);
    +        }
    +}
    +
    +    public boolean isComplete() {
    +        return saslClient.isComplete();
    +    }
    +
    +    /**
    +     * Respond to server's SASL token.
    +     * 
    +     * @param saslTokenMessage
    +     *            contains server's SASL token
    +     * @return client's response SASL token
    +     */
    +    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
    +        try {
    +            final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
    +            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
    +                    public byte[] run() {
    +                        try {
    +                            byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
    +                                                                         .getSaslToken());
    +                            return retval;
    +                        } catch (SaslException e) {
    +                            LOG.error("saslResponse: Failed to respond to SASL server's token:",
    +                                      e);
    +                            throw new RuntimeException(e);
    +                        }
    +                    }
    +                });
    +            return retval;
    +        }
    +        catch (PrivilegedActionException e) {
    +            LOG.error("Failed to generate response for token: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    /**
    +     * Implementation of javax.security.auth.callback.CallbackHandler that works
    +     * with Storm topology tokens.
    +     */
    +    private static class SaslClientCallbackHandler implements CallbackHandler {
    +
    +        /**
    +         * Set private members using topology token.
    +         * 
    +         * @param topologyToken
    +         */
    +        public SaslClientCallbackHandler() {
    +        }
    --- End diff --
    
    Could you put in a //NOOP or something in the empty blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45505380
  
    --- Diff: storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj ---
    @@ -0,0 +1,157 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns backtype.storm.cluster-state.zookeeper-state-factory
    +  (:import [org.apache.curator.framework.state ConnectionStateListener])
    +  (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]
    +           [backtype.storm.cluster ClusterState ClusterStateContext DaemonType])
    +  (:use [backtype.storm cluster config log util])
    +  (:require [backtype.storm [zookeeper :as zk]])
    +  (:gen-class
    +   :implements [backtype.storm.cluster.ClusterStateFactory]))
    --- End diff --
    
    Unused imports?
    * `KeeperException`
    * `ZooDefs`
    * `ZooDefs$Ids`
    * `ZooDefs$Perms`
    * `ClusterStateContext`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515660
  
    --- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---
    @@ -0,0 +1,124 @@
    +;; Licensed to the Apache Software Foundation (ASF) under one
    +;; or more contributor license agreements.  See the NOTICE file
    +;; distributed with this work for additional information
    +;; regarding copyright ownership.  The ASF licenses this file
    +;; to you under the Apache License, Version 2.0 (the
    +;; "License"); you may not use this file except in compliance
    +;; with the License.  You may obtain a copy of the License at
    +;;
    +;; http://www.apache.org/licenses/LICENSE-2.0
    +;;
    +;; Unless required by applicable law or agreed to in writing, software
    +;; distributed under the License is distributed on an "AS IS" BASIS,
    +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +;; See the License for the specific language governing permissions and
    +;; limitations under the License.
    +
    +(ns org.apache.storm.pacemaker.pacemaker-state-factory
    +  (:require [org.apache.storm.pacemaker pacemaker]
    +            [backtype.storm.cluster-state [zookeeper-state-factory :as zk-factory]]
    +            [backtype.storm
    +             [config :refer :all]
    +             [cluster :refer :all]
    +             [log :refer :all]
    +             [util :as util]])
    +  (:import [backtype.storm.generated
    +            HBExecutionException HBNodes HBRecords
    +            HBServerMessageType HBMessage HBMessageData HBPulse]
    +           [backtype.storm.cluster_state zookeeper_state_factory]
    +           [backtype.storm.cluster ClusterState]
    +           [org.apache.storm.pacemaker PacemakerClient])
    +  (:gen-class
    +   :implements [backtype.storm.cluster.ClusterStateFactory]))
    +
    +;; So we can mock the client for testing
    +(defn makeClient [conf]
    +  (PacemakerClient. conf))
    +
    +(defn makeZKState [conf auth-conf acls context]
    +  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
    +
    +(def max-retries 10)
    +
    +(defn -mkState [this conf auth-conf acls context]
    +  (let [zk-state (makeZKState conf auth-conf acls context)
    +        pacemaker-client (makeClient conf)]
    +
    +    (reify
    +      ClusterState
    +      ;; Let these pass through to the zk-state. We only want to handle heartbeats.
    +      (register [this callback] (.register zk-state callback))
    +      (unregister [this callback] (.unregister zk-state callback))
    +      (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state path data acls))
    +      (create_sequential [this path data acls] (.create_sequential zk-state path data acls))
    +      (set_data [this path data acls] (.set_data zk-state path data acls))
    +      (delete_node [this path] (.delete_node zk-state path))
    +      (get_data [this path watch?] (.get_data zk-state path watch?))
    +      (get_data_with_version [this path watch?] (.get_data_with_version zk-state path watch?))
    +      (get_version [this path watch?] (.get_version zk-state path watch?))
    +      (get_children [this path watch?] (.get_children zk-state path watch?))
    +      (mkdirs [this path acls] (.mkdirs zk-state path acls))
    +      (node_exists [this path watch?] (.node_exists zk-state path watch?))
    +      (add_listener [this listener] (.add_listener zk-state listener))
    +      (sync_path [this path] (.sync_path zk-state path))
    +      
    +      (set_worker_hb [this path data acls]
    +        (util/retry-on-exception
    +         max-retries
    +         "set_worker_hb"
    +         #(let [response
    +                (.send pacemaker-client
    +                       (HBMessage. HBServerMessageType/SEND_PULSE
    +                                   (HBMessageData/pulse
    +                                    (doto (HBPulse.)
    +                                      (.set_id path)
    +                                      (.set_details data)))))]
    +            (if (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE)
    +              :ok
    +              (throw (HBExecutionException. "Invalid Response Type"))))))
    --- End diff --
    
    It's part of the ClusterState interface spec.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45515366
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java ---
    @@ -25,33 +26,39 @@
     import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import java.util.List;
     import java.util.concurrent.atomic.AtomicInteger;
     
    -class StormServerHandler extends SimpleChannelUpstreamHandler  {
    +public class StormServerHandler extends SimpleChannelUpstreamHandler  {
         private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
    -    Server server;
    +    IServer server;
         private AtomicInteger failure_count; 
    +    private Channel channel;
         
    -    StormServerHandler(Server server) {
    +    public StormServerHandler(IServer server) {
             this.server = server;
             failure_count = new AtomicInteger(0);
         }
         
         @Override
         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    -        server.addChannel(e.getChannel());
    +        server.channelConnected(e.getChannel());
    +        if(channel != null) {
    +            LOG.debug("Replacing channel with new channel: "
    +                      + channel.toString() + " -> " + e.getChannel().toString());
    --- End diff --
    
    Also, we don't need to call `.toString` here I don't think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45655066
  
    --- Diff: storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java ---
    @@ -276,4 +312,39 @@ public static String get(Configuration configuration, String section, String key
             }
             return null;
         }
    +
    +    private static final String USERNAME = "username";
    +    private static final String PASSWORD = "password";
    +
    +    public static String makeDigestPayload(Configuration login_config, String config_section) {
    +        String username = null;
    +        String password = null;
    +        try {
    +            Map<String, ?> results = AuthUtils.PullConfig(login_config, config_section);
    +            username = (String)results.get(USERNAME);
    +            password = (String)results.get(PASSWORD);
    +        }
    +        catch (Exception e) {
    +            LOG.error("Failed to pull username/password out of jaas conf", e);
    +        }
    +
    +        if(username == null || password == null) {
    +            return null;
    +        }
    +
    +        try {
    +            MessageDigest digest = MessageDigest.getInstance("SHA-512");
    +            byte[] output = digest.digest((username + ":" + password).getBytes());
    +
    +            StringBuilder builder = new StringBuilder();
    +            for(byte b : output) {
    +            builder.append(String.format("%02x", b));
    --- End diff --
    
    Check indent under `for`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559976
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -510,8 +546,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
                     throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
                             connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
     
    -            }
    +    }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517109
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---
    @@ -0,0 +1,252 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker;
    +
    +import backtype.storm.Config;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.messaging.netty.Client;
    --- End diff --
    
    unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #838: [STORM-885] Heartbeat Server (Pacemaker)

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/838
  
    @knusbaum please file a JIRA and make sure that we do that soon.  Did you do that work or was it @redsanket ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559942
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -335,7 +354,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
                 }
     
             });
    -    }
    +        }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-156504222
  
    @ptgoetz Sounds good. The good news is that Pacemaker is still off by default, so aside from making ClusterState pluggable, there should be no changes to the existing code path.
    
    I'm currently working on documentation for it as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45511599
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java ---
    @@ -0,0 +1,26 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +
    +public interface INettySerializable {
    +    public ChannelBuffer buffer() throws IOException;
    +    public int encodeLength();
    --- End diff --
    
    methods don't need `public` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/838#issuecomment-159059774
  
    Thanks,
    +1 pending no relevant test failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45505075
  
    --- Diff: storm-core/src/clj/backtype/storm/util.clj ---
    @@ -1062,6 +1062,22 @@
     (defn hashmap-to-persistent [^HashMap m]
       (zipmap (.keySet m) (.values m)))
     
    +(defn retry-on-exception
    +  "Retries specific function on exception based on retries count"
    +  [tries task-description f & args]
    +  (let [res (try {:value (apply f args)}
    +              (catch Exception e
    +                (if (= 0 tries)
    +                  (throw e)
    +                  {:exception e})))]
    --- End diff --
    
    Probably should check that `tries` is not `pos?`, in case of crazy values of `tries`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45505612
  
    --- Diff: bin/storm.py ---
    @@ -263,6 +263,17 @@ def upload_credentials(*args):
             jvmtype="-client",
             extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
     
    +def heartbeats(*args):
    +    """Syntax: [storm heartbeats [cmd]]
    +
    +    list [KEY...] - lists heartbeats nodes under KEY currently in pacemaker.
    --- End diff --
    
    According to the hearbeats.clj command, we also support a `get` subcommand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45502326
  
    --- Diff: bin/storm.py ---
    @@ -263,6 +263,17 @@ def upload_credentials(*args):
             jvmtype="-client",
             extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
     
    +def heartbeats(*args):
    +    """Syntax: [storm heartbeats [cmd]]
    +
    +    list [KEY...] - lists heartbeats nodes under KEY currently in pacemaker.
    --- End diff --
    
    It might be more intuitive to use `PATH` here instead of `KEY`.  We use it by running `storm heartbeats /`, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517334
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java ---
    @@ -0,0 +1,158 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker;
    +
    +import backtype.storm.Config;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.messaging.netty.ISaslServer;
    +import backtype.storm.messaging.netty.NettyRenameThreadFactory;
    +import backtype.storm.security.auth.AuthUtils;
    +import java.lang.InterruptedException;
    +import java.net.InetSocketAddress;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListSet;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ThreadFactory;
    +import javax.security.auth.login.Configuration;
    +import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec;
    +import org.jboss.netty.bootstrap.ServerBootstrap;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelPipelineFactory;
    +import org.jboss.netty.channel.group.ChannelGroup;
    +import org.jboss.netty.channel.group.DefaultChannelGroup;
    +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +class PacemakerServer implements ISaslServer {
    +
    +    private static final long FIVE_MB_IN_BYTES = 5 * 1024 * 1024;
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(PacemakerServer.class);
    +
    +    private final ServerBootstrap bootstrap;
    +    private int port;
    +    private IServerMessageHandler handler;
    +    private String secret;
    +    private String topo_name;
    +    private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    +    private ConcurrentSkipListSet<Channel> authenticated_channels = new ConcurrentSkipListSet<Channel>();
    +    private ThriftNettyServerCodec.AuthMethod authMethod;
    +
    +    public PacemakerServer(IServerMessageHandler handler, Map config){
    +        int maxWorkers = (int)config.get(Config.PACEMAKER_MAX_THREADS);
    +        this.port = (int)config.get(Config.PACEMAKER_PORT);
    +        this.handler = handler;
    +        this.topo_name = "pacemaker_server";
    +
    +        String auth = (String)config.get(Config.PACEMAKER_AUTH_METHOD);
    +        if(auth.equals("DIGEST")) {
    --- End diff --
    
    switch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45128428
  
    --- Diff: docs/documentation/Pacemaker.md ---
    @@ -0,0 +1,89 @@
    +# Pacemaker
    +
    +### Introduction
    +Pacemaker is a storm daemon designed to process heartbeats from workers. As Storm is scaled up, ZooKeeper begins to become a bottleneck due to high volumes of writes from workers doing heartbeats. Lots of writes to disk and traffic across the network is generated as ZooKeeper tries to maintain consistency.
    +
    +Because heartbeats are of an ephemeral nature, they do not need to be persisted to disk or synced across nodes; an in-memory store will do. This is the role of Pacemaker. Pacemaker functions as a simple in-memory key/value store with ZooKeeper-like, directory-style keys and byte array values.
    +
    +The corresponding Pacemaker client is a plugin for the `ClusterState` interface, `org.apache.storm.pacemaker.pacemaker_state_factory`. Heartbeat calls are funneled by the `ClusterState` produced by `pacemaker_state_factory` into the Pacemaker daemon, while other set/get operations are forwarded to ZooKeeper.
    +
    +------
    +
    +### Configuration
    +
    + - `pacemaker.host` : The host that the Pacemaker daemon is running on
    + - `pacemaker.port` : The port that Pacemaker will listen on
    + - `pacemaker.max.threads` : Maximum number of threads Pacemaker daemon will use to handle requests.
    + - `pacemaker.childopts` : Any JVM parameters that need to go to the Pacemaker. (used by storm-deploy project)
    + - `pacemaker.auth.method` : The authentication method that is used (more info below)
    +
    +#### Example
    +
    +To get Pacemaker up and running, set the following option in the cluster config on all nodes:
    +```
    +storm.cluster.state.store: "org.apache.storm.pacemaker.pacemaker_state_factory"
    +```
    +
    +The Pacemaker host also needs to be set on all nodes:
    +```
    +pacemaker.host: somehost.mycompany.com
    +```
    +
    +And then start all of your daemons
    +
    +(including Pacemaker):
    +```
    +$ storm pacemaker
    +```
    +
    +The Storm cluster should now be pushing all worker heartbeats through Pacemaker.
    +
    +### Security
    +
    +Currently digest (password-based) and Kerberos security are supported. Security is currently only around reads, not writes. Writes may be performed by anyone, whereas reads may only be performed by authorized and authenticated users. This is an area for future development, as it leaves the cluster open to DoS attacks, but it prevents any sensitive information from reaching unauthorized eyes, which was the main goal.
    +
    +#### Digest
    +To configure digest authentication, set `pacemaker.auth.method: DIGEST` in the cluster config on the nodes hosting Nimbus and Pacemaker.
    +The nodes must also have `java.security.auth.login.config` set to point to a jaas config file containing the following structure:
    +```
    +PacemakerDigest {
    +    username="some username"
    +    password="some password"
    +};
    --- End diff --
    
    Do we also need a semicolon `;` after `"some password"`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-885] Heartbeat Server (Pacemaker)

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45517684
  
    --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java ---
    @@ -0,0 +1,108 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.pacemaker.codec;
    +
    +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channel;
    +import backtype.storm.generated.HBMessage;
    +import backtype.storm.generated.HBMessageData;
    +import backtype.storm.generated.HBServerMessageType;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.messaging.netty.ControlMessage;
    +import backtype.storm.messaging.netty.SaslMessageToken;
    +import backtype.storm.messaging.netty.INettySerializable;
    +import java.io.IOException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.thrift.TBase;
    +
    +public class ThriftEncoder extends OneToOneEncoder {
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(ThriftEncoder.class);
    +
    +    private HBMessage encodeNettySerializable(INettySerializable netty_message,
    +                                              HBServerMessageType mType) {
    +        
    +        HBMessageData message_data = new HBMessageData();
    +        HBMessage m = new HBMessage();
    +        try {
    +            ChannelBuffer cbuffer = netty_message.buffer();
    +            if(cbuffer.hasArray()) {
    +                message_data.set_message_blob(cbuffer.array());
    +            }
    +            else {
    +                byte buff[] = new byte[netty_message.encodeLength()];
    +                cbuffer.readBytes(buff, 0, netty_message.encodeLength());
    +                message_data.set_message_blob(buff);
    +            }
    +            m.set_type(mType);
    +            m.set_data(message_data);
    +            return m;
    +        }
    +        catch( IOException e) {
    +            LOG.error("Failed to encode NettySerializable: ", e);
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
    +        if(msg == null) return null;
    --- End diff --
    
    minor: braces for `if`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-855] Heartbeat Server (Pacemaker)

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43559969
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -495,7 +531,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
                                         connectionAttempt);
                                 if (messagesLost.get() > 0) {
                                     LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
    -                            }
    +    }
    --- End diff --
    
    Spacing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---