You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:55 UTC

[16/37] storm git commit: Merge remote-tracking branch 'asf/master' into STORM-855

Merge remote-tracking branch 'asf/master' into STORM-855


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62d725a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62d725a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62d725a8

Branch: refs/heads/master
Commit: 62d725a85e7869290805cdbe55f1f3bce1f905de
Parents: 80c60d8 2b6884b
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Tue Nov 10 11:11:47 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Tue Nov 10 11:11:47 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |   26 +
 DISCLAIMER                                      |   10 -
 LICENSE                                         |   12 +
 README.markdown                                 |    3 +-
 STORM-UI-REST-API.md                            |  735 --
 bin/flight.bash                                 |  154 +
 bin/storm-config.cmd                            |   10 +-
 bin/storm.py                                    |   14 +-
 conf/defaults.yaml                              |    6 +-
 conf/storm.yaml.example                         |    2 +-
 dev-tools/travis/ratprint.py                    |   26 +
 dev-tools/travis/travis-install.sh              |    5 +
 dev-tools/travis/travis-script.sh               |    6 +
 docs/DYNAMIC_LOG_LEVEL_SETTINGS.md              |   41 -
 docs/documentation/Documentation.md             |    4 +
 docs/documentation/Log-Search.md                |   14 +
 .../Message-passing-implementation.md           |   34 +-
 .../documentation/dynamic-log-level-settings.md |   41 +
 docs/documentation/dynamic-worker-profiling.md  |   29 +
 .../images/dynamic_log_level_settings_1.png     |  Bin 0 -> 93689 bytes
 .../images/dynamic_log_level_settings_2.png     |  Bin 0 -> 78785 bytes
 .../images/dynamic_profiling_debugging_1.png    |  Bin 0 -> 93635 bytes
 .../images/dynamic_profiling_debugging_2.png    |  Bin 0 -> 138120 bytes
 .../images/dynamic_profiling_debugging_3.png    |  Bin 0 -> 96974 bytes
 docs/documentation/images/search-a-topology.png |  Bin 0 -> 671031 bytes
 .../images/search-for-a-single-worker-log.png   |  Bin 0 -> 736579 bytes
 .../storm-metrics-profiling-internal-actions.md |   70 +
 docs/documentation/ui-rest-api.md               |  984 ++
 docs/images/dynamic_log_level_settings_1.png    |  Bin 93689 -> 0 bytes
 docs/images/dynamic_log_level_settings_2.png    |  Bin 78785 -> 0 bytes
 docs/images/viewing_metrics_with_VisualVM.png   |  Bin 0 -> 225100 bytes
 .../starter/ResourceAwareExampleTopology.java   |   20 +-
 external/flux/README.md                         |    1 +
 .../main/java/org/apache/storm/flux/Flux.java   |    3 +-
 .../java/org/apache/storm/flux/FluxBuilder.java |   13 +
 .../org/apache/storm/flux/test/TestBolt.java    |    4 +
 .../resources/configs/config-methods-test.yaml  |    1 +
 external/storm-hdfs/README.md                   |   33 +
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |   17 +
 .../ha/codedistributor/HDFSCodeDistributor.java |   17 +
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    |   17 +
 .../storm/hdfs/bolt/TestSequenceFileBolt.java   |  186 +
 .../storm/hdfs/trident/HdfsStateTest.java       |   17 +
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |    2 +
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |    9 +
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |    5 +
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |    3 +
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |    5 +
 .../storm/jdbc/bolt/JdbcInsertBoltTest.java     |   71 +
 .../storm/jdbc/bolt/JdbcLookupBoltTest.java     |   59 +
 external/storm-kafka/README.md                  |   11 +
 .../jvm/storm/kafka/DynamicBrokersReader.java   |   97 +-
 .../kafka/DynamicPartitionConnections.java      |   20 +-
 .../src/jvm/storm/kafka/KafkaSpout.java         |    2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |   85 +-
 .../src/jvm/storm/kafka/Partition.java          |   26 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   18 +-
 .../src/jvm/storm/kafka/StaticCoordinator.java  |   11 +-
 .../storm/kafka/StringMultiSchemeWithTopic.java |   57 +
 .../src/jvm/storm/kafka/ZkCoordinator.java      |    2 +-
 .../jvm/storm/kafka/trident/Coordinator.java    |    7 +-
 .../trident/GlobalPartitionInformation.java     |   26 +-
 .../jvm/storm/kafka/trident/IBrokerReader.java  |    7 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |    5 +-
 .../storm/kafka/trident/StaticBrokerReader.java |   23 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   36 +-
 .../storm/kafka/trident/TridentKafkaState.java  |   25 +-
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |   20 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  114 +-
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   58 +-
 .../src/test/storm/kafka/TestUtils.java         |   20 +-
 .../test/storm/kafka/TridentKafkaTopology.java  |    8 +-
 .../src/test/storm/kafka/ZkCoordinatorTest.java |    8 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |    4 +-
 external/storm-solr/pom.xml                     |   17 +
 pom.xml                                         |  132 +-
 storm-core/pom.xml                              |   47 +
 storm-core/src/clj/backtype/storm/cluster.clj   |   57 +-
 storm-core/src/clj/backtype/storm/config.clj    |   23 +
 storm-core/src/clj/backtype/storm/converter.clj |   19 +-
 .../src/clj/backtype/storm/daemon/common.clj    |    5 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |   18 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   54 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |  532 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  194 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  169 +-
 .../src/clj/backtype/storm/daemon/task.clj      |    4 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   35 +-
 .../src/clj/backtype/storm/local_state.clj      |   24 +
 .../src/clj/backtype/storm/messaging/local.clj  |   34 +-
 storm-core/src/clj/backtype/storm/stats.clj     |    9 +
 storm-core/src/clj/backtype/storm/timer.clj     |   20 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  246 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   16 +-
 storm-core/src/clj/backtype/storm/util.clj      |   12 +-
 .../src/dev/logviewer-search-context-tests.log  |    1 +
 .../dev/logviewer-search-context-tests.log.gz   |  Bin 0 -> 72 bytes
 storm-core/src/dev/small-worker.log             |    1 +
 storm-core/src/dev/test-3072.log                |    3 +
 storm-core/src/dev/test-worker.log              |  380 +
 storm-core/src/jvm/backtype/storm/Config.java   |   64 +-
 .../src/jvm/backtype/storm/LogWriter.java       |    2 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |   24 +-
 .../storm/codedistributor/ICodeDistributor.java |   17 +
 .../LocalFileSystemCodeDistributor.java         |   17 +
 .../storm/coordination/BatchBoltExecutor.java   |    4 +-
 .../storm/coordination/CoordinatedBolt.java     |   14 +-
 .../storm/drpc/DRPCInvocationsClient.java       |    5 +-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |   10 +-
 .../src/jvm/backtype/storm/drpc/JoinResult.java |    8 +-
 .../storm/generated/AlreadyAliveException.java  |    7 +-
 .../storm/generated/ClusterSummary.java         |  111 +-
 .../backtype/storm/generated/LSTopoHistory.java |  805 ++
 .../storm/generated/LSTopoHistoryList.java      |  460 +
 .../jvm/backtype/storm/generated/Nimbus.java    | 9689 ++++++++++++------
 .../backtype/storm/generated/ProfileAction.java |   74 +
 .../storm/generated/ProfileRequest.java         |  631 ++
 .../storm/generated/TopologyHistoryInfo.java    |  461 +
 .../src/jvm/backtype/storm/grouping/Load.java   |   77 +
 .../grouping/LoadAwareCustomStreamGrouping.java |   24 +
 .../grouping/LoadAwareShuffleGrouping.java      |   76 +
 .../backtype/storm/grouping/LoadMapping.java    |   64 +
 .../storm/grouping/PartialKeyGrouping.java      |    5 +-
 .../storm/grouping/ShuffleGrouping.java         |   65 +
 .../storm/messaging/ConnectionWithStatus.java   |    4 +-
 .../backtype/storm/messaging/IConnection.java   |   16 +
 .../jvm/backtype/storm/messaging/IContext.java  |    2 +-
 .../storm/messaging/TransportFactory.java       |    2 +-
 .../backtype/storm/messaging/netty/Client.java  |   35 +-
 .../backtype/storm/messaging/netty/Context.java |    8 +-
 .../storm/messaging/netty/ControlMessage.java   |    5 +-
 .../storm/messaging/netty/MessageBatch.java     |   14 +-
 .../storm/messaging/netty/MessageDecoder.java   |    7 +-
 .../storm/messaging/netty/SaslMessageToken.java |    3 +-
 .../storm/messaging/netty/SaslNettyClient.java  |    6 +-
 .../messaging/netty/SaslStormClientHandler.java |    4 +-
 .../messaging/netty/SaslStormServerHandler.java |   11 +-
 .../storm/messaging/netty/SaslUtils.java        |   11 +-
 .../backtype/storm/messaging/netty/Server.java  |   51 +-
 .../messaging/netty/StormClientHandler.java     |   26 +-
 .../backtype/storm/metric/EventLoggerBolt.java  |   25 +-
 .../storm/metric/FileBasedEventLogger.java      |   19 +-
 .../metric/HttpForwardingMetricsConsumer.java   |    1 -
 .../metric/HttpForwardingMetricsServer.java     |    1 -
 .../jvm/backtype/storm/metric/IEventLogger.java |   25 +-
 .../storm/metric/LoggingMetricsConsumer.java    |    1 -
 .../storm/metric/MetricsConsumerBolt.java       |    1 -
 .../jvm/backtype/storm/metric/SystemBolt.java   |    5 -
 .../backtype/storm/metric/api/CountMetric.java  |    2 -
 .../backtype/storm/metric/api/MeanReducer.java  |    4 +-
 .../storm/metric/api/MultiCountMetric.java      |    2 +-
 .../storm/metric/api/MultiReducedMetric.java    |    2 +-
 .../storm/metric/api/rpc/CountShellMetric.java  |    3 +-
 .../AbstractDNSToSwitchMapping.java             |    2 +-
 .../DefaultRackDNSToSwitchMapping.java          |   21 +-
 .../backtype/storm/nimbus/ILeaderElector.java   |   23 +-
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |   21 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |   69 +-
 .../scheduler/SchedulerAssignmentImpl.java      |   15 +-
 .../storm/scheduler/SupervisorDetails.java      |    6 +-
 .../backtype/storm/scheduler/Topologies.java    |   12 +-
 .../storm/scheduler/TopologyDetails.java        |   30 +-
 .../scheduler/multitenant/DefaultPool.java      |   22 +-
 .../storm/scheduler/multitenant/FreePool.java   |    6 +-
 .../scheduler/multitenant/IsolatedPool.java     |   32 +-
 .../multitenant/MultitenantScheduler.java       |    6 +-
 .../storm/scheduler/multitenant/Node.java       |   17 +-
 .../storm/scheduler/multitenant/NodePool.java   |   16 +-
 .../strategies/ResourceAwareStrategy.java       |   69 +-
 .../backtype/storm/security/auth/AuthUtils.java |   27 +-
 .../auth/DefaultHttpCredentialsPlugin.java      |    6 +-
 .../security/auth/DefaultPrincipalToLocal.java  |    1 -
 .../storm/security/auth/IAuthorizer.java        |    4 +-
 .../security/auth/ICredentialsRenewer.java      |    3 +-
 .../security/auth/IHttpCredentialsPlugin.java   |    2 -
 .../storm/security/auth/IPrincipalToLocal.java  |    2 +-
 .../storm/security/auth/ITransportPlugin.java   |    4 -
 .../security/auth/KerberosPrincipalToLocal.java |    2 +-
 .../storm/security/auth/ReqContext.java         |   11 +-
 .../security/auth/SaslTransportPlugin.java      |   12 +-
 .../security/auth/ShellBasedGroupsMapping.java  |   10 +-
 .../security/auth/SimpleTransportPlugin.java    |    6 +-
 .../security/auth/SingleUserPrincipal.java      |    5 +-
 .../storm/security/auth/TBackoffConnect.java    |    1 -
 .../storm/security/auth/ThriftClient.java       |   10 +-
 .../storm/security/auth/ThriftServer.java       |    6 +-
 .../auth/authorizer/DRPCAuthorizerBase.java     |    2 +-
 .../authorizer/DRPCSimpleACLAuthorizer.java     |   19 +-
 .../auth/authorizer/DenyAuthorizer.java         |   11 +-
 .../authorizer/ImpersonationAuthorizer.java     |   17 +-
 .../auth/authorizer/NoopAuthorizer.java         |    7 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    |   26 +-
 .../authorizer/SimpleWhitelistAuthorizer.java   |   11 +-
 .../auth/digest/ClientCallbackHandler.java      |    2 -
 .../auth/digest/DigestSaslTransportPlugin.java  |    2 -
 .../auth/digest/ServerCallbackHandler.java      |    5 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |   10 +-
 .../security/auth/kerberos/NoOpTTrasport.java   |   20 +-
 .../serialization/BlowfishTupleSerializer.java  |    6 +-
 .../GzipThriftSerializationDelegate.java        |    1 -
 .../storm/serialization/ITupleDeserializer.java |    1 -
 .../serialization/KryoTupleDeserializer.java    |    3 -
 .../serialization/KryoValuesDeserializer.java   |    3 +-
 .../serialization/SerializationFactory.java     |   23 +-
 .../jvm/backtype/storm/spout/ShellSpout.java    |    4 +-
 .../storm/task/GeneralTopologyContext.java      |   15 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   42 +-
 .../backtype/storm/task/TopologyContext.java    |    9 +-
 .../AlternateRackDNSToSwitchMapping.java        |   17 +
 .../storm/testing/MemoryTransactionalSpout.java |    9 +-
 .../testing/OpaqueMemoryTransactionalSpout.java |    8 +-
 .../storm/testing/TupleCaptureBolt.java         |    4 +-
 .../storm/topology/BasicBoltExecutor.java       |    2 +-
 .../storm/topology/OutputFieldsGetter.java      |    2 +-
 .../storm/topology/TopologyBuilder.java         |   16 +-
 .../storm/topology/base/BaseBatchBolt.java      |    1 -
 .../topology/base/BaseTransactionalSpout.java   |    1 -
 .../TransactionalSpoutBatchExecutor.java        |    4 +-
 .../TransactionalSpoutCoordinator.java          |    2 +-
 ...uePartitionedTransactionalSpoutExecutor.java |   13 +-
 .../PartitionedTransactionalSpoutExecutor.java  |    2 +-
 .../src/jvm/backtype/storm/tuple/Fields.java    |   10 +-
 .../src/jvm/backtype/storm/tuple/MessageId.java |   10 +-
 .../src/jvm/backtype/storm/tuple/Tuple.java     |    1 -
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |   10 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |    1 -
 .../backtype/storm/utils/DisruptorQueue.java    |    5 +-
 .../backtype/storm/utils/InprocMessaging.java   |    4 +-
 .../storm/utils/KeyedRoundRobinQueue.java       |    6 +-
 .../jvm/backtype/storm/utils/ListDelegate.java  |    6 +-
 .../jvm/backtype/storm/utils/LocalState.java    |   22 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   |    3 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   10 +-
 .../storm/utils/RegisteredGlobalState.java      |    6 +-
 .../jvm/backtype/storm/utils/RotatingMap.java   |    2 +-
 .../backtype/storm/utils/ServiceRegistry.java   |    2 +-
 .../jvm/backtype/storm/utils/ShellProcess.java  |    6 +-
 .../jvm/backtype/storm/utils/ShellUtils.java    |    2 +-
 .../StormBoundedExponentialBackoffRetry.java    |    3 +-
 .../src/jvm/backtype/storm/utils/Time.java      |    4 +-
 .../backtype/storm/utils/TransferDrainer.java   |   17 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   19 +-
 .../jvm/backtype/storm/utils/VersionInfo.java   |    2 +-
 .../storm/validation/ConfigValidation.java      |  113 +-
 .../src/jvm/storm/trident/TridentTopology.java  |   87 +-
 .../trident/drpc/ReturnResultsReducer.java      |    4 +-
 .../fluent/ChainedAggregatorDeclarer.java       |    8 +-
 .../jvm/storm/trident/graph/GraphGrouper.java   |   13 +-
 .../src/jvm/storm/trident/graph/Group.java      |   23 +-
 .../trident/operation/builtin/SnapshotGet.java  |    4 +-
 .../operation/builtin/TupleCollectionGet.java   |    6 +-
 .../storm/trident/partition/GlobalGrouping.java |    5 +-
 .../trident/partition/IdentityGrouping.java     |    8 +-
 .../src/jvm/storm/trident/planner/Node.java     |    5 +-
 .../storm/trident/planner/PartitionNode.java    |    2 -
 .../storm/trident/planner/SubtopologyBolt.java  |   19 +-
 .../processor/MultiReducerProcessor.java        |    2 +-
 .../OpaquePartitionedTridentSpoutExecutor.java  |   10 +-
 .../trident/spout/TridentSpoutExecutor.java     |    4 +-
 .../trident/topology/TridentBoltExecutor.java   |    6 +-
 .../topology/TridentTopologyBuilder.java        |   23 +-
 .../storm/trident/tuple/TridentTupleView.java   |   18 +-
 .../src/native/worker-launcher/impl/main.c      |   10 +
 .../worker-launcher/impl/worker-launcher.c      |   49 +-
 .../worker-launcher/impl/worker-launcher.h      |    2 +
 storm-core/src/py/storm/Nimbus-remote           |   21 +
 storm-core/src/py/storm/Nimbus.py               |  595 ++
 storm-core/src/py/storm/ttypes.py               |  433 +-
 storm-core/src/storm.thrift                     |   38 +
 storm-core/src/ui/public/component.html         |  167 +-
 .../src/ui/public/deep_search_result.html       |  155 +
 storm-core/src/ui/public/images/search.png      |  Bin 0 -> 2354 bytes
 .../src/ui/public/js/typeahead.jquery.min.js    |    7 +
 storm-core/src/ui/public/logviewer_search.html  |   65 +
 storm-core/src/ui/public/search_result.html     |  100 +
 .../templates/component-page-template.html      |   53 +
 .../deep-search-result-page-template.html       |   66 +
 .../logviewer-search-page-template.html         |   44 +
 .../templates/search-result-page-template.html  |   60 +
 .../templates/topology-page-template.html       |   11 +
 .../src/ui/public/templates/user-template.html  |   17 +-
 storm-core/src/ui/public/topology.html          |    8 +-
 .../test/clj/backtype/storm/grouping_test.clj   |   90 +-
 .../clj/backtype/storm/integration_test.clj     |    4 +-
 .../test/clj/backtype/storm/logviewer_test.clj  |  418 +
 .../storm/messaging/netty_integration_test.clj  |    3 +-
 .../storm/messaging/netty_unit_test.clj         |  217 +-
 .../test/clj/backtype/storm/messaging_test.clj  |    3 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    3 +-
 .../storm/pacemaker_state_factory_test.clj      |   15 +
 .../clj/org/apache/storm/pacemaker_test.clj     |   15 +
 .../storm/utils/DisruptorQueueTest.java         |    9 +-
 storm-dist/binary/src/main/assembly/binary.xml  |    5 -
 293 files changed, 16992 insertions(+), 5462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/bin/storm.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index b1eb0d0,cf1ece6..35aa8c8
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -17,12 -17,12 +17,12 @@@
  (ns backtype.storm.cluster
    (:import [org.apache.zookeeper.data Stat ACL Id]
             [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
-             LogConfig]
+             LogConfig ProfileAction ProfileRequest NodeInfo]
             [java.io Serializable])
    (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
 -  (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState])
    (:import [org.apache.curator.framework CuratorFramework])
    (:import [backtype.storm.utils Utils])
 +  (:import [backtype.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
    (:import [java.security MessageDigest])
    (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
    (:import [backtype.storm.nimbus NimbusInfo])
@@@ -376,6 -509,48 +388,48 @@@
          [this storm-id log-config]
          (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
  
+       (set-worker-profile-request
+         [this storm-id profile-request]
+         (let [request-type (.get_action profile-request)
+               host (.get_node (.get_nodeInfo profile-request))
+               port (first (.get_port (.get_nodeInfo profile-request)))]
+           (.set_data cluster-state
+                      (profiler-config-path storm-id host port request-type)
+                      (Utils/serialize profile-request)
+                      acls)))
+ 
+       (get-topology-profile-requests
+         [this storm-id thrift?]
+         (let [path (profiler-config-path storm-id)
 -              requests (if (exists-node? cluster-state path false)
++              requests (if (.node_exists cluster-state path false)
+                          (dofor [c (.get_children cluster-state path false)]
+                                 (let [raw (.get_data cluster-state (str path "/" c) false)
+                                       request (maybe-deserialize raw ProfileRequest)]
+                                       (if thrift?
+                                         request
+                                         (clojurify-profile-request request)))))]
+           requests))
+ 
+       (delete-topology-profile-requests
+         [this storm-id profile-request]
+         (let [profile-request-inst (thriftify-profile-request profile-request)
+               action (:action profile-request)
+               host (:host profile-request)
+               port (:port profile-request)]
+           (.delete_node cluster-state
+            (profiler-config-path storm-id host port action))))
+           
+       (get-worker-profile-requests
+         [this storm-id node-info thrift?]
+         (let [host (:host node-info)
+               port (:port node-info)
+               profile-requests (get-topology-profile-requests this storm-id thrift?)]
+           (if thrift?
+             (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo  %)))))
+                     profile-requests)
+             (filter #(and (= host (:host %)) (= port (:port %)))
+                     profile-requests))))
+       
        (worker-heartbeat!
          [this storm-id node port info]
          (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
@@@ -482,10 -657,11 +536,11 @@@
  
        (remove-storm!
          [this storm-id]
 -        (delete-node cluster-state (assignment-path storm-id))
 -        (delete-node cluster-state (code-distributor-path storm-id))
 -        (delete-node cluster-state (credentials-path storm-id))
 -        (delete-node cluster-state (log-config-path storm-id))
 -        (delete-node cluster-state (profiler-config-path storm-id))
 +        (.delete_node cluster-state (assignment-path storm-id))
 +        (.delete_node cluster-state (code-distributor-path storm-id))
 +        (.delete_node cluster-state (credentials-path storm-id))
 +        (.delete_node cluster-state (log-config-path storm-id))
++        (.delete_node cluster-state (profiler-config-path storm-id))
          (remove-storm-base! this storm-id))
  
        (set-credentials!

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/config.clj
index 89b4a30,1a5cb51..0700a37
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@@ -265,7 -273,16 +273,22 @@@
    [conf id]
    (LocalState. (worker-heartbeats-root conf id)))
  
 +(defn override-login-config-with-system-property [conf]
 +  (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")]
 +    (assoc conf "java.security.auth.login.config" login_conf_file)
 +    conf))
++
+ (defn get-topo-logs-users
+   [topology-conf]
+   (sort (distinct (remove nil?
+                     (concat
+                       (topology-conf LOGS-USERS)
+                       (topology-conf TOPOLOGY-USERS))))))
+ 
+ (defn get-topo-logs-groups
+   [topology-conf]
+   (sort (distinct (remove nil?
+                     (concat
+                       (topology-conf LOGS-GROUPS)
+                       (topology-conf TOPOLOGY-GROUPS))))))
++

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index bffd953,e4507f5..769d010
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -39,11 -39,10 +39,10 @@@ public enum ControlMessage implements I
      }
  
      /**
-      * Return a control message per an encoded status code
-      * @param encoded
-      * @return
+      * @param encoded status code
+      * @return a control message per an encoded status code
       */
 -    static ControlMessage mkMessage(short encoded) {
 +    public static ControlMessage mkMessage(short encoded) {
          for(ControlMessage cm: ControlMessage.values()) {
            if(encoded == cm.code) return cm;
          }

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index 2fe5c2d,d7a86d1..70e7089
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@@ -27,14 -24,7 +27,13 @@@ import org.slf4j.LoggerFactory
  /**
   * Send and receive SASL tokens.
   */
- 
 -public class SaslMessageToken {
 +public class SaslMessageToken implements INettySerializable {
 +    public static final short IDENTIFIER = -500;
 +
 +    /** Class logger */
 +    private static final Logger LOG = LoggerFactory
 +            .getLogger(SaslMessageToken.class);
 +
      /** Used for client or server's token to send or receive from each other. */
      private byte[] token;
  
@@@ -92,9 -82,8 +91,9 @@@
          if (token != null)
              payload_len = token.length;
  
- 
 -        bout.writeShort(identifier);
 -        bout.writeInt(payload_len);
 +        bout.writeShort(IDENTIFIER);
 +        bout.writeInt((int) payload_len);
++
          if (payload_len > 0) {
              bout.write(token);
          }

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 15fe9fb,5ce90a3..2836e80
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@@ -121,9 -121,7 +119,8 @@@ public class SaslStormServerHandler ext
                  LOG.debug("Removing SaslServerHandler from pipeline since SASL "
                          + "authentication is complete.");
                  ctx.getPipeline().remove(this);
 +                server.authenticated(channel);
              }
-             return;
          } else {
              // Client should not be sending other-than-SASL messages before
              // SaslServerHandler has removed itself from the pipeline. Such

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 719c84c,32c2bd7..5f23064
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -17,20 -17,6 +17,21 @@@
   */
  package backtype.storm.messaging.netty;
  
 +import backtype.storm.Config;
++import backtype.storm.grouping.Load;
 +import backtype.storm.messaging.TaskMessage;
 +import backtype.storm.metric.api.IStatefulObject;
 +import backtype.storm.serialization.KryoValuesSerializer;
 +import backtype.storm.utils.Utils;
 +import org.jboss.netty.bootstrap.ServerBootstrap;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelFactory;
 +import org.jboss.netty.channel.group.ChannelGroup;
 +import org.jboss.netty.channel.group.DefaultChannelGroup;
 +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
  import java.net.InetSocketAddress;
  import java.util.ArrayList;
  import java.util.Arrays;
@@@ -46,7 -32,23 +47,8 @@@ import java.util.concurrent.LinkedBlock
  import java.util.concurrent.ThreadFactory;
  import java.io.IOException;
  
 -import org.jboss.netty.bootstrap.ServerBootstrap;
 -import org.jboss.netty.channel.Channel;
 -import org.jboss.netty.channel.ChannelFactory;
 -import org.jboss.netty.channel.group.ChannelGroup;
 -import org.jboss.netty.channel.group.DefaultChannelGroup;
 -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 -
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
+ 
 -import backtype.storm.Config;
 -import backtype.storm.grouping.Load;
  import backtype.storm.messaging.ConnectionWithStatus;
 -import backtype.storm.messaging.TaskMessage;
 -import backtype.storm.metric.api.IStatefulObject;
 -import backtype.storm.serialization.KryoValuesSerializer;
 -import backtype.storm.utils.Utils;
  
  class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
  
@@@ -54,10 -56,10 +56,10 @@@
      @SuppressWarnings("rawtypes")
      Map storm_conf;
      int port;
-     private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String, AtomicInteger>();
+     private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
      private final AtomicInteger messagesDequeued = new AtomicInteger(0);
      private final AtomicInteger[] pendingMessages;
 -    
 +
      // Create multiple queues for incoming messages. The size equals the number of receiver threads.
      // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
      private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
@@@ -79,18 -81,18 +81,18 @@@
          this.storm_conf = storm_conf;
          this.port = port;
          _ser = new KryoValuesSerializer(storm_conf);
 -        
 +
          queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
          roundRobinQueueId = 0;
-         taskToQueueId = new HashMap<Integer, Integer>();
+         taskToQueueId = new HashMap<>();
 -    
 +
          message_queue = new LinkedBlockingQueue[queueCount];
          pendingMessages = new AtomicInteger[queueCount];
          for (int i = 0; i < queueCount; i++) {
-             message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+             message_queue[i] = new LinkedBlockingQueue<>();
              pendingMessages[i] = new AtomicInteger(0);
          }
 -        
 +
          // Configure the server.
          int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
          int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500);
@@@ -237,18 -236,17 +236,17 @@@
          }
          return null;
      }
 -   
 +
      /**
       * register a newly created channel
-      * @param channel
+      * @param channel newly created channel
       */
      protected void addChannel(Channel channel) {
          allChannels.add(channel);
      }
 -    
 +
      /**
-      * close a channel
-      * @param channel
+      * @param channel channel to close
       */
      public void closeChannel(Channel channel) {
          channel.close().awaitUninterruptibly();

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 2c8eae9,877b6d8..696a2fc
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -51,15 -51,31 +51,31 @@@ public class StormClientHandler extend
          //examine the response message from server
          Object message = event.getMessage();
          if (message instanceof ControlMessage) {
-           ControlMessage msg = (ControlMessage)message;
-           if (msg==ControlMessage.FAILURE_RESPONSE)
-               LOG.info("failure response:{}", msg);
- 
+             ControlMessage msg = (ControlMessage)message;
+             if (msg==ControlMessage.FAILURE_RESPONSE) {
+                 LOG.info("failure response:{}", msg);
+             }
+         } else if (message instanceof List) {
+             try {
+                 //This should be the metrics, and there should only be one of them
+                 List<TaskMessage> list = (List<TaskMessage>)message;
+                 if (list.size() < 1) throw new RuntimeException("Didn't see enough load metrics ("+client.getDstAddress()+") "+list);
+                 if (list.size() != 1) LOG.warn("Messages are not being delivered fast enough, got "+list.size()+" metrics messages at once("+client.getDstAddress()+")");
+                 TaskMessage tm = ((List<TaskMessage>)message).get(list.size() - 1);
+                 if (tm.task() != -1) throw new RuntimeException("Metrics messages are sent to the system task ("+client.getDstAddress()+") "+tm);
+                 List metrics = _des.deserialize(tm.message());
+                 if (metrics.size() < 1) throw new RuntimeException("No metrics data in the metrics message ("+client.getDstAddress()+") "+metrics);
+                 if (!(metrics.get(0) instanceof Map)) throw new RuntimeException("The metrics did not have a map in the first slot ("+client.getDstAddress()+") "+metrics);
+                 client.setLoadMetrics((Map<Integer, Double>)metrics.get(0));
+             } catch (IOException e) {
+                 throw new RuntimeException(e);
+             }
          } else {
-           throw new RuntimeException("Don't know how to handle a message of type "+message+" ("+client.getDstAddress()+")");
+             throw new RuntimeException("Don't know how to handle a message of type "
+                                        + message + " (" + client.getDstAddress() + ")");
          }
      }
 -
 +        
      @Override
      public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
          client.notifyInterestChanged(e.getChannel());

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index cd79f4e,8062b4e..943199c
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@@ -34,11 -32,7 +32,10 @@@ import java.net.URI
  import java.util.Collection;
  import java.util.Set;
  import java.util.HashSet;
 +import java.util.HashMap;
  import java.util.Map;
 +import java.util.SortedMap;
 +import java.util.TreeMap;
- import java.util.concurrent.ExecutorService;
  
  public class AuthUtils {
      private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@@ -77,39 -68,8 +74,39 @@@
      }
  
      /**
 +     * Pull a set of keys out of a Configuration.
 +     * @param configs_to_pull A set of config keys that you want the values of.
 +     * @param conf The config to pull the key/value pairs out of.
 +     * @param conf_entry The app configuration entry name to get stuff from.
 +     * @return Return a map of the configs in configs_to_pull to their values.
 +     */
 +    public static SortedMap<String, ?> PullConfig(Configuration conf,
 +                                            String conf_entry) throws IOException {
 +        if(conf == null) {
 +            return null;
 +        }
 +        AppConfigurationEntry configurationEntries[] = conf.getAppConfigurationEntry(conf_entry);
 +        if(configurationEntries == null) {
 +            String errorMessage = "Could not find a '" + conf_entry
 +                + "' entry in this configuration: Client cannot start.";
 +            throw new IOException(errorMessage);
 +        }
 +
 +        TreeMap<String, Object> results = new TreeMap<>();
 +        
 +
 +        for(AppConfigurationEntry entry: configurationEntries) {
 +            Map<String, ?> options = entry.getOptions();
 +            for(String key : options.keySet()) {
 +                results.put(key, options.get(key));
 +            }
 +        }
 +        return results;
 +    }
 +
 +    /**
       * Construct a principal to local plugin
-      * @param conf storm configuration
+      * @param storm_conf storm configuration
       * @return the plugin
       */
      public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/storm.thrift
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
index 5024922,0000000..813ae84
mode 100644,000000..100644
--- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
@@@ -1,135 -1,0 +1,150 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http://www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns org.apache.storm.pacemaker-state-factory-test
 +  (:require [clojure.test :refer :all]
 +            [conjure.core :refer :all]
 +            [org.apache.storm.pacemaker [pacemaker-state-factory :as psf]])
 +  (:import [backtype.storm.generated
 +            HBExecutionException HBNodes HBRecords
 +            HBServerMessageType HBMessage HBMessageData HBPulse]
 +           [backtype.storm.cluster ClusterStateContext]
 +           [org.mockito Mockito Matchers]))
 +
 +(defn- string-to-bytes [string]
 +  (byte-array (map int string)))
 +
 +(defn- bytes-to-string [bytez]
 +  (apply str (map char bytez)))
 +
 +(defprotocol send-capture
 +  (send [this something])
 +  (check-captured [this]))
 +
 +(defn- make-send-capture [response]
 +  (let [captured (atom nil)]
 +    (reify send-capture
 +      (send [this something] (reset! captured something) response)
 +      (check-captured [this] @captured))))
 +
 +(defmacro with-mock-pacemaker-client-and-state [client state response & body]
 +  `(let [~client (make-send-capture ~response)]
 +     (stubbing [psf/makeZKState nil
 +                psf/makeClient ~client]
 +               (let [~state (psf/-mkState nil nil nil nil (ClusterStateContext.))]
 +                 ~@body))))
 +
 +
 +(deftest pacemaker_state_set_worker_hb
 +  (testing "set_worker_hb"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)
 +
 +      (.set_worker_hb state "/foo" (string-to-bytes "data") nil)
 +      (let [sent (.check-captured client)
 +            pulse (.get_pulse (.get_data sent))]
 +        (is (= (.get_type sent) HBServerMessageType/SEND_PULSE))
 +        (is (= (.get_id pulse) "/foo"))
 +        (is (= (bytes-to-string (.get_details pulse)) "data")))))
 +
 +  (testing "set_worker_hb"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/SEND_PULSE nil)
 +
 +      (is (thrown? HBExecutionException      
 +                   (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
 +
 +      
 +
 +(deftest pacemaker_state_delete_worker_hb
 +  (testing "delete_worker_hb"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)
 +
 +      (.delete_worker_hb state "/foo/bar")
 +      (let [sent (.check-captured client)]
 +        (is (= (.get_type sent) HBServerMessageType/DELETE_PATH))
 +        (is (= (.get_path (.get_data sent)) "/foo/bar")))))
 +
 +    (testing "delete_worker_hb"
 +      (with-mock-pacemaker-client-and-state
 +        client state
 +        (HBMessage. HBServerMessageType/DELETE_PATH nil)
 +        
 +        (is (thrown? HBExecutionException
 +                     (.delete_worker_hb state "/foo/bar"))))))
 +
 +(deftest pacemaker_state_get_worker_hb
 +  (testing "get_worker_hb"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
 +                (HBMessageData/pulse
 +                 (doto (HBPulse.)
 +                   (.set_id "/foo")
 +                   (.set_details (string-to-bytes "some data")))))
 +
 +      (.get_worker_hb state "/foo" false)
 +      (let [sent (.check-captured client)]
 +        (is (= (.get_type sent) HBServerMessageType/GET_PULSE))
 +        (is (= (.get_path (.get_data sent)) "/foo")))))
 +
 +  (testing "get_worker_hb - fail (bad response)"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/GET_PULSE nil)
 +      
 +      (is (thrown? HBExecutionException
 +                   (.get_worker_hb state "/foo" false)))))
 +  
 +  (testing "get_worker_hb - fail (bad data)"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil)
 +      
 +      (is (thrown? HBExecutionException
 +                   (.get_worker_hb state "/foo" false))))))
 +
 +(deftest pacemaker_state_get_worker_hb_children
 +  (testing "get_worker_hb_children"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
 +                (HBMessageData/nodes
 +                 (HBNodes. [])))
 +
 +      (.get_worker_hb_children state "/foo" false)
 +      (let [sent (.check-captured client)]
 +        (is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH))
 +        (is (= (.get_path (.get_data sent)) "/foo")))))
 +
 +  (testing "get_worker_hb_children - fail (bad response)"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/DELETE_PATH nil)
 +
 +      (is (thrown? HBExecutionException
 +                   (.get_worker_hb_children state "/foo" false)))))
 +
 +    (testing "get_worker_hb_children - fail (bad data)"
 +    (with-mock-pacemaker-client-and-state
 +      client state
 +      (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil)
 +      
 +      (is (thrown? HBExecutionException
 +                   (.get_worker_hb_children state "/foo" false))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_test.clj
index ca7c693,0000000..4c5359f
mode 100644,000000..100644
--- a/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
@@@ -1,227 -1,0 +1,242 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http://www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns org.apache.storm.pacemaker-test
 +  (:require [clojure.test :refer :all]
 +            [org.apache.storm.pacemaker [pacemaker :as pacemaker]]
 +            [conjure.core :as conjure])
 +  (:import [backtype.storm.generated
 +            HBExecutionException HBNodes HBRecords
 +            HBServerMessageType HBMessage HBMessageData HBPulse]))
 +
 +(defn- message-with-rand-id [type data]
 +  (let [mid (rand-int 1000)
 +        message (HBMessage. type data)]
 +    (.set_message_id message mid)
 +    [message mid]))
 +
 +(defn- string-to-bytes [string]
 +  (byte-array (map int string)))
 +
 +(defn- bytes-to-string [bytez]
 +  (apply str (map char bytez)))
 +
 +(defn- makenode [handler path]
 +  (.handleMessage handler
 +                  (HBMessage.
 +                   HBServerMessageType/SEND_PULSE
 +                   (HBMessageData/pulse
 +                    (doto (HBPulse.)
 +                      (.set_id path)
 +                      (.set_details (string-to-bytes "nothing")))))
 +                  true))
 +
 +(deftest pacemaker-server-create-path
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "CREATE_PATH"
 +       (let [[message mid] (message-with-rand-id
 +                            HBServerMessageType/CREATE_PATH
 +                            (HBMessageData/path "/testpath"))
 +             response (.handleMessage handler message true)]
 +         (is (= (.get_message_id response) mid))
 +         (is (= (.get_type response) HBServerMessageType/CREATE_PATH_RESPONSE))
 +         (is (= (.get_data response) nil)))))))
 +
 +(deftest pacemaker-server-exists
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "EXISTS - false"
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/EXISTS
 +                                                 (HBMessageData/path "/testpath"))
 +             bad-response (.handleMessage handler message false)
 +             good-response (.handleMessage handler message true)]
 +         (is (= (.get_message_id bad-response) mid))
 +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +
 +         (is (= (.get_message_id good-response) mid))
 +         (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
 +         (is (= (.get_boolval (.get_data good-response)) false))))
 +
 +     (testing "EXISTS - true"
 +       (let [path "/exists_path"
 +             data-string "pulse data"]
 +         (let [[send _] (message-with-rand-id
 +                         HBServerMessageType/SEND_PULSE
 +                         (HBMessageData/pulse
 +                          (doto (HBPulse.)
 +                            (.set_id path)
 +                            (.set_details (string-to-bytes data-string)))))
 +               _ (.handleMessage handler send true)
 +               [message mid] (message-with-rand-id HBServerMessageType/EXISTS
 +                                                   (HBMessageData/path path))
 +               bad-response (.handleMessage handler message false)
 +               good-response (.handleMessage handler message true)]
 +           (is (= (.get_message_id bad-response) mid))
 +           (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +
 +           (is (= (.get_message_id good-response) mid))
 +           (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
 +           (is (= (.get_boolval (.get_data good-response)) true))))))))
 +
 +(deftest pacemaker-server-send-pulse-get-pulse
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "SEND_PULSE - GET_PULSE"
 +       (let [path "/pulsepath"
 +             data-string "pulse data"]
 +         (let [[message mid] (message-with-rand-id
 +                              HBServerMessageType/SEND_PULSE
 +                              (HBMessageData/pulse
 +                               (doto (HBPulse.)
 +                                 (.set_id path)
 +                                 (.set_details (string-to-bytes data-string)))))
 +               response (.handleMessage handler message true)]
 +           (is (= (.get_message_id response) mid))
 +           (is (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE))
 +           (is (= (.get_data response) nil)))
 +         (let [[message mid] (message-with-rand-id
 +                              HBServerMessageType/GET_PULSE
 +                              (HBMessageData/path path))
 +               response (.handleMessage handler message true)]
 +           (is (= (.get_message_id response) mid))
 +           (is (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE))
 +           (is (= (bytes-to-string (.get_details (.get_pulse (.get_data response)))) data-string))))))))
 +
 +(deftest pacemaker-server-get-all-pulse-for-path
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "GET_ALL_PULSE_FOR_PATH"
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_PULSE_FOR_PATH
 +                                                 (HBMessageData/path "/testpath"))
 +             bad-response (.handleMessage handler message false)
 +             good-response (.handleMessage handler message true)]
 +         (is (= (.get_message_id bad-response) mid))
 +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +
 +         (is (= (.get_message_id good-response) mid))
 +         (is (= (.get_type good-response) HBServerMessageType/GET_ALL_PULSE_FOR_PATH_RESPONSE))
 +         (is (= (.get_data good-response) nil)))))))
 +
 +(deftest pacemaker-server-get-all-nodes-for-path
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "GET_ALL_NODES_FOR_PATH"
 +       (makenode handler "/some-root-path/foo")
 +       (makenode handler "/some-root-path/bar")
 +       (makenode handler "/some-root-path/baz")
 +       (makenode handler "/some-root-path/boo")
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
 +                                                 (HBMessageData/path "/some-root-path"))
 +             bad-response (.handleMessage handler message false)
 +             good-response (.handleMessage handler message true)
 +             ids (into #{} (.get_pulseIds (.get_nodes (.get_data good-response))))]
 +         (is (= (.get_message_id bad-response) mid))
 +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +
 +         (is (= (.get_message_id good-response) mid))
 +         (is (= (.get_type good-response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
 +         (is (contains? ids "foo"))
 +         (is (contains? ids "bar"))
 +         (is (contains? ids "baz"))
 +         (is (contains? ids "boo")))
 +
 +       (makenode handler "/some/deeper/path/foo")
 +       (makenode handler "/some/deeper/path/bar")
 +       (makenode handler "/some/deeper/path/baz")
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
 +                                                 (HBMessageData/path "/some/deeper/path"))
 +             bad-response (.handleMessage handler message false)
 +             good-response (.handleMessage handler message true)
 +             ids (into #{} (.get_pulseIds (.get_nodes (.get_data good-response))))]
 +         (is (= (.get_message_id bad-response) mid))
 +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +
 +         (is (= (.get_message_id good-response) mid))
 +         (is (= (.get_type good-response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
 +         (is (contains? ids "foo"))
 +         (is (contains? ids "bar"))
 +         (is (contains? ids "baz")))))))
 +
 +(deftest pacemaker-server-get-pulse
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "GET_PULSE"
 +       (makenode handler "/some-root/GET_PULSE")
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_PULSE
 +                                                 (HBMessageData/path "/some-root/GET_PULSE"))
 +             bad-response (.handleMessage handler message false)
 +             good-response (.handleMessage handler message true)
 +             good-pulse (.get_pulse (.get_data good-response))]
 +         (is (= (.get_message_id bad-response) mid))
 +         (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
 +         (is (= (.get_data bad-response) nil))
 +
 +         (is (= (.get_message_id good-response) mid))
 +         (is (= (.get_type good-response) HBServerMessageType/GET_PULSE_RESPONSE))
 +         (is (= (.get_id good-pulse) "/some-root/GET_PULSE"))
 +         (is (= (bytes-to-string (.get_details good-pulse)) "nothing")))))))
 +
 +(deftest pacemaker-server-delete-path
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "DELETE_PATH"
 +       (makenode handler "/some-root/DELETE_PATH/foo")
 +       (makenode handler "/some-root/DELETE_PATH/bar")
 +       (makenode handler "/some-root/DELETE_PATH/baz")
 +       (makenode handler "/some-root/DELETE_PATH/boo")
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/DELETE_PATH
 +                                                 (HBMessageData/path "/some-root/DELETE_PATH"))
 +             response (.handleMessage handler message true)]
 +         (is (= (.get_message_id response) mid))
 +         (is (= (.get_type response) HBServerMessageType/DELETE_PATH_RESPONSE))
 +         (is (= (.get_data response) nil)))
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
 +                                                 (HBMessageData/path "/some-root/DELETE_PATH"))
 +             response (.handleMessage handler message true)
 +             ids (into #{} (.get_pulseIds (.get_nodes (.get_data response))))]
 +         (is (= (.get_message_id response) mid))
 +         (is (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
 +         (is (empty? ids)))))))
 +
 +(deftest pacemaker-server-delete-pulse-id
 +  (conjure/stubbing
 +   [pacemaker/register nil]
 +   (let [handler (pacemaker/mk-handler {})]
 +     (testing "DELETE_PULSE_ID"
 +       (makenode handler "/some-root/DELETE_PULSE_ID/foo")
 +       (makenode handler "/some-root/DELETE_PULSE_ID/bar")
 +       (makenode handler "/some-root/DELETE_PULSE_ID/baz")
 +       (makenode handler "/some-root/DELETE_PULSE_ID/boo")
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/DELETE_PULSE_ID
 +                                                 (HBMessageData/path "/some-root/DELETE_PULSE_ID/foo"))
 +             response (.handleMessage handler message true)]
 +         (is (= (.get_message_id response) mid))
 +         (is (= (.get_type response) HBServerMessageType/DELETE_PULSE_ID_RESPONSE))
 +         (is (= (.get_data response) nil)))
 +       (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
 +                                                 (HBMessageData/path "/some-root/DELETE_PULSE_ID"))
 +             response (.handleMessage handler message true)
 +             ids (into #{} (.get_pulseIds (.get_nodes (.get_data response))))]
 +         (is (= (.get_message_id response) mid))
 +         (is (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
 +         (is (not (contains? ids "foo"))))))))