You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:55 UTC
[16/37] storm git commit: Merge remote-tracking branch 'asf/master'
into STORM-855
Merge remote-tracking branch 'asf/master' into STORM-855
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62d725a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62d725a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62d725a8
Branch: refs/heads/master
Commit: 62d725a85e7869290805cdbe55f1f3bce1f905de
Parents: 80c60d8 2b6884b
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Tue Nov 10 11:11:47 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Tue Nov 10 11:11:47 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 26 +
DISCLAIMER | 10 -
LICENSE | 12 +
README.markdown | 3 +-
STORM-UI-REST-API.md | 735 --
bin/flight.bash | 154 +
bin/storm-config.cmd | 10 +-
bin/storm.py | 14 +-
conf/defaults.yaml | 6 +-
conf/storm.yaml.example | 2 +-
dev-tools/travis/ratprint.py | 26 +
dev-tools/travis/travis-install.sh | 5 +
dev-tools/travis/travis-script.sh | 6 +
docs/DYNAMIC_LOG_LEVEL_SETTINGS.md | 41 -
docs/documentation/Documentation.md | 4 +
docs/documentation/Log-Search.md | 14 +
.../Message-passing-implementation.md | 34 +-
.../documentation/dynamic-log-level-settings.md | 41 +
docs/documentation/dynamic-worker-profiling.md | 29 +
.../images/dynamic_log_level_settings_1.png | Bin 0 -> 93689 bytes
.../images/dynamic_log_level_settings_2.png | Bin 0 -> 78785 bytes
.../images/dynamic_profiling_debugging_1.png | Bin 0 -> 93635 bytes
.../images/dynamic_profiling_debugging_2.png | Bin 0 -> 138120 bytes
.../images/dynamic_profiling_debugging_3.png | Bin 0 -> 96974 bytes
docs/documentation/images/search-a-topology.png | Bin 0 -> 671031 bytes
.../images/search-for-a-single-worker-log.png | Bin 0 -> 736579 bytes
.../storm-metrics-profiling-internal-actions.md | 70 +
docs/documentation/ui-rest-api.md | 984 ++
docs/images/dynamic_log_level_settings_1.png | Bin 93689 -> 0 bytes
docs/images/dynamic_log_level_settings_2.png | Bin 78785 -> 0 bytes
docs/images/viewing_metrics_with_VisualVM.png | Bin 0 -> 225100 bytes
.../starter/ResourceAwareExampleTopology.java | 20 +-
external/flux/README.md | 1 +
.../main/java/org/apache/storm/flux/Flux.java | 3 +-
.../java/org/apache/storm/flux/FluxBuilder.java | 13 +
.../org/apache/storm/flux/test/TestBolt.java | 4 +
.../resources/configs/config-methods-test.yaml | 1 +
external/storm-hdfs/README.md | 33 +
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 17 +
.../ha/codedistributor/HDFSCodeDistributor.java | 17 +
.../hdfs/bolt/AvroGenericRecordBoltTest.java | 17 +
.../storm/hdfs/bolt/TestSequenceFileBolt.java | 186 +
.../storm/hdfs/trident/HdfsStateTest.java | 17 +
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 2 +
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 9 +
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +
.../jdbc/mapper/SimpleJdbcLookupMapper.java | 3 +
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 5 +
.../storm/jdbc/bolt/JdbcInsertBoltTest.java | 71 +
.../storm/jdbc/bolt/JdbcLookupBoltTest.java | 59 +
external/storm-kafka/README.md | 11 +
.../jvm/storm/kafka/DynamicBrokersReader.java | 97 +-
.../kafka/DynamicPartitionConnections.java | 20 +-
.../src/jvm/storm/kafka/KafkaSpout.java | 2 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 85 +-
.../src/jvm/storm/kafka/Partition.java | 26 +-
.../src/jvm/storm/kafka/PartitionManager.java | 18 +-
.../src/jvm/storm/kafka/StaticCoordinator.java | 11 +-
.../storm/kafka/StringMultiSchemeWithTopic.java | 57 +
.../src/jvm/storm/kafka/ZkCoordinator.java | 2 +-
.../jvm/storm/kafka/trident/Coordinator.java | 7 +-
.../trident/GlobalPartitionInformation.java | 26 +-
.../jvm/storm/kafka/trident/IBrokerReader.java | 7 +-
.../kafka/trident/OpaqueTridentKafkaSpout.java | 5 +-
.../storm/kafka/trident/StaticBrokerReader.java | 23 +-
.../kafka/trident/TridentKafkaEmitter.java | 36 +-
.../storm/kafka/trident/TridentKafkaState.java | 25 +-
.../jvm/storm/kafka/trident/ZkBrokerReader.java | 20 +-
.../storm/kafka/DynamicBrokersReaderTest.java | 114 +-
.../src/test/storm/kafka/KafkaUtilsTest.java | 58 +-
.../src/test/storm/kafka/TestUtils.java | 20 +-
.../test/storm/kafka/TridentKafkaTopology.java | 8 +-
.../src/test/storm/kafka/ZkCoordinatorTest.java | 8 +-
.../test/storm/kafka/bolt/KafkaBoltTest.java | 4 +-
external/storm-solr/pom.xml | 17 +
pom.xml | 132 +-
storm-core/pom.xml | 47 +
storm-core/src/clj/backtype/storm/cluster.clj | 57 +-
storm-core/src/clj/backtype/storm/config.clj | 23 +
storm-core/src/clj/backtype/storm/converter.clj | 19 +-
.../src/clj/backtype/storm/daemon/common.clj | 5 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 18 +-
.../src/clj/backtype/storm/daemon/executor.clj | 54 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 532 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 194 +-
.../clj/backtype/storm/daemon/supervisor.clj | 169 +-
.../src/clj/backtype/storm/daemon/task.clj | 4 +-
.../src/clj/backtype/storm/daemon/worker.clj | 35 +-
.../src/clj/backtype/storm/local_state.clj | 24 +
.../src/clj/backtype/storm/messaging/local.clj | 34 +-
storm-core/src/clj/backtype/storm/stats.clj | 9 +
storm-core/src/clj/backtype/storm/timer.clj | 20 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 246 +-
.../src/clj/backtype/storm/ui/helpers.clj | 16 +-
storm-core/src/clj/backtype/storm/util.clj | 12 +-
.../src/dev/logviewer-search-context-tests.log | 1 +
.../dev/logviewer-search-context-tests.log.gz | Bin 0 -> 72 bytes
storm-core/src/dev/small-worker.log | 1 +
storm-core/src/dev/test-3072.log | 3 +
storm-core/src/dev/test-worker.log | 380 +
storm-core/src/jvm/backtype/storm/Config.java | 64 +-
.../src/jvm/backtype/storm/LogWriter.java | 2 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 24 +-
.../storm/codedistributor/ICodeDistributor.java | 17 +
.../LocalFileSystemCodeDistributor.java | 17 +
.../storm/coordination/BatchBoltExecutor.java | 4 +-
.../storm/coordination/CoordinatedBolt.java | 14 +-
.../storm/drpc/DRPCInvocationsClient.java | 5 +-
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 10 +-
.../src/jvm/backtype/storm/drpc/JoinResult.java | 8 +-
.../storm/generated/AlreadyAliveException.java | 7 +-
.../storm/generated/ClusterSummary.java | 111 +-
.../backtype/storm/generated/LSTopoHistory.java | 805 ++
.../storm/generated/LSTopoHistoryList.java | 460 +
.../jvm/backtype/storm/generated/Nimbus.java | 9689 ++++++++++++------
.../backtype/storm/generated/ProfileAction.java | 74 +
.../storm/generated/ProfileRequest.java | 631 ++
.../storm/generated/TopologyHistoryInfo.java | 461 +
.../src/jvm/backtype/storm/grouping/Load.java | 77 +
.../grouping/LoadAwareCustomStreamGrouping.java | 24 +
.../grouping/LoadAwareShuffleGrouping.java | 76 +
.../backtype/storm/grouping/LoadMapping.java | 64 +
.../storm/grouping/PartialKeyGrouping.java | 5 +-
.../storm/grouping/ShuffleGrouping.java | 65 +
.../storm/messaging/ConnectionWithStatus.java | 4 +-
.../backtype/storm/messaging/IConnection.java | 16 +
.../jvm/backtype/storm/messaging/IContext.java | 2 +-
.../storm/messaging/TransportFactory.java | 2 +-
.../backtype/storm/messaging/netty/Client.java | 35 +-
.../backtype/storm/messaging/netty/Context.java | 8 +-
.../storm/messaging/netty/ControlMessage.java | 5 +-
.../storm/messaging/netty/MessageBatch.java | 14 +-
.../storm/messaging/netty/MessageDecoder.java | 7 +-
.../storm/messaging/netty/SaslMessageToken.java | 3 +-
.../storm/messaging/netty/SaslNettyClient.java | 6 +-
.../messaging/netty/SaslStormClientHandler.java | 4 +-
.../messaging/netty/SaslStormServerHandler.java | 11 +-
.../storm/messaging/netty/SaslUtils.java | 11 +-
.../backtype/storm/messaging/netty/Server.java | 51 +-
.../messaging/netty/StormClientHandler.java | 26 +-
.../backtype/storm/metric/EventLoggerBolt.java | 25 +-
.../storm/metric/FileBasedEventLogger.java | 19 +-
.../metric/HttpForwardingMetricsConsumer.java | 1 -
.../metric/HttpForwardingMetricsServer.java | 1 -
.../jvm/backtype/storm/metric/IEventLogger.java | 25 +-
.../storm/metric/LoggingMetricsConsumer.java | 1 -
.../storm/metric/MetricsConsumerBolt.java | 1 -
.../jvm/backtype/storm/metric/SystemBolt.java | 5 -
.../backtype/storm/metric/api/CountMetric.java | 2 -
.../backtype/storm/metric/api/MeanReducer.java | 4 +-
.../storm/metric/api/MultiCountMetric.java | 2 +-
.../storm/metric/api/MultiReducedMetric.java | 2 +-
.../storm/metric/api/rpc/CountShellMetric.java | 3 +-
.../AbstractDNSToSwitchMapping.java | 2 +-
.../DefaultRackDNSToSwitchMapping.java | 21 +-
.../backtype/storm/nimbus/ILeaderElector.java | 23 +-
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 21 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 69 +-
.../scheduler/SchedulerAssignmentImpl.java | 15 +-
.../storm/scheduler/SupervisorDetails.java | 6 +-
.../backtype/storm/scheduler/Topologies.java | 12 +-
.../storm/scheduler/TopologyDetails.java | 30 +-
.../scheduler/multitenant/DefaultPool.java | 22 +-
.../storm/scheduler/multitenant/FreePool.java | 6 +-
.../scheduler/multitenant/IsolatedPool.java | 32 +-
.../multitenant/MultitenantScheduler.java | 6 +-
.../storm/scheduler/multitenant/Node.java | 17 +-
.../storm/scheduler/multitenant/NodePool.java | 16 +-
.../strategies/ResourceAwareStrategy.java | 69 +-
.../backtype/storm/security/auth/AuthUtils.java | 27 +-
.../auth/DefaultHttpCredentialsPlugin.java | 6 +-
.../security/auth/DefaultPrincipalToLocal.java | 1 -
.../storm/security/auth/IAuthorizer.java | 4 +-
.../security/auth/ICredentialsRenewer.java | 3 +-
.../security/auth/IHttpCredentialsPlugin.java | 2 -
.../storm/security/auth/IPrincipalToLocal.java | 2 +-
.../storm/security/auth/ITransportPlugin.java | 4 -
.../security/auth/KerberosPrincipalToLocal.java | 2 +-
.../storm/security/auth/ReqContext.java | 11 +-
.../security/auth/SaslTransportPlugin.java | 12 +-
.../security/auth/ShellBasedGroupsMapping.java | 10 +-
.../security/auth/SimpleTransportPlugin.java | 6 +-
.../security/auth/SingleUserPrincipal.java | 5 +-
.../storm/security/auth/TBackoffConnect.java | 1 -
.../storm/security/auth/ThriftClient.java | 10 +-
.../storm/security/auth/ThriftServer.java | 6 +-
.../auth/authorizer/DRPCAuthorizerBase.java | 2 +-
.../authorizer/DRPCSimpleACLAuthorizer.java | 19 +-
.../auth/authorizer/DenyAuthorizer.java | 11 +-
.../authorizer/ImpersonationAuthorizer.java | 17 +-
.../auth/authorizer/NoopAuthorizer.java | 7 +-
.../auth/authorizer/SimpleACLAuthorizer.java | 26 +-
.../authorizer/SimpleWhitelistAuthorizer.java | 11 +-
.../auth/digest/ClientCallbackHandler.java | 2 -
.../auth/digest/DigestSaslTransportPlugin.java | 2 -
.../auth/digest/ServerCallbackHandler.java | 5 +-
.../storm/security/auth/kerberos/AutoTGT.java | 10 +-
.../security/auth/kerberos/NoOpTTrasport.java | 20 +-
.../serialization/BlowfishTupleSerializer.java | 6 +-
.../GzipThriftSerializationDelegate.java | 1 -
.../storm/serialization/ITupleDeserializer.java | 1 -
.../serialization/KryoTupleDeserializer.java | 3 -
.../serialization/KryoValuesDeserializer.java | 3 +-
.../serialization/SerializationFactory.java | 23 +-
.../jvm/backtype/storm/spout/ShellSpout.java | 4 +-
.../storm/task/GeneralTopologyContext.java | 15 +-
.../src/jvm/backtype/storm/task/ShellBolt.java | 42 +-
.../backtype/storm/task/TopologyContext.java | 9 +-
.../AlternateRackDNSToSwitchMapping.java | 17 +
.../storm/testing/MemoryTransactionalSpout.java | 9 +-
.../testing/OpaqueMemoryTransactionalSpout.java | 8 +-
.../storm/testing/TupleCaptureBolt.java | 4 +-
.../storm/topology/BasicBoltExecutor.java | 2 +-
.../storm/topology/OutputFieldsGetter.java | 2 +-
.../storm/topology/TopologyBuilder.java | 16 +-
.../storm/topology/base/BaseBatchBolt.java | 1 -
.../topology/base/BaseTransactionalSpout.java | 1 -
.../TransactionalSpoutBatchExecutor.java | 4 +-
.../TransactionalSpoutCoordinator.java | 2 +-
...uePartitionedTransactionalSpoutExecutor.java | 13 +-
.../PartitionedTransactionalSpoutExecutor.java | 2 +-
.../src/jvm/backtype/storm/tuple/Fields.java | 10 +-
.../src/jvm/backtype/storm/tuple/MessageId.java | 10 +-
.../src/jvm/backtype/storm/tuple/Tuple.java | 1 -
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 10 +-
.../jvm/backtype/storm/utils/DRPCClient.java | 1 -
.../backtype/storm/utils/DisruptorQueue.java | 5 +-
.../backtype/storm/utils/InprocMessaging.java | 4 +-
.../storm/utils/KeyedRoundRobinQueue.java | 6 +-
.../jvm/backtype/storm/utils/ListDelegate.java | 6 +-
.../jvm/backtype/storm/utils/LocalState.java | 22 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 3 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 10 +-
.../storm/utils/RegisteredGlobalState.java | 6 +-
.../jvm/backtype/storm/utils/RotatingMap.java | 2 +-
.../backtype/storm/utils/ServiceRegistry.java | 2 +-
.../jvm/backtype/storm/utils/ShellProcess.java | 6 +-
.../jvm/backtype/storm/utils/ShellUtils.java | 2 +-
.../StormBoundedExponentialBackoffRetry.java | 3 +-
.../src/jvm/backtype/storm/utils/Time.java | 4 +-
.../backtype/storm/utils/TransferDrainer.java | 17 +-
.../src/jvm/backtype/storm/utils/Utils.java | 19 +-
.../jvm/backtype/storm/utils/VersionInfo.java | 2 +-
.../storm/validation/ConfigValidation.java | 113 +-
.../src/jvm/storm/trident/TridentTopology.java | 87 +-
.../trident/drpc/ReturnResultsReducer.java | 4 +-
.../fluent/ChainedAggregatorDeclarer.java | 8 +-
.../jvm/storm/trident/graph/GraphGrouper.java | 13 +-
.../src/jvm/storm/trident/graph/Group.java | 23 +-
.../trident/operation/builtin/SnapshotGet.java | 4 +-
.../operation/builtin/TupleCollectionGet.java | 6 +-
.../storm/trident/partition/GlobalGrouping.java | 5 +-
.../trident/partition/IdentityGrouping.java | 8 +-
.../src/jvm/storm/trident/planner/Node.java | 5 +-
.../storm/trident/planner/PartitionNode.java | 2 -
.../storm/trident/planner/SubtopologyBolt.java | 19 +-
.../processor/MultiReducerProcessor.java | 2 +-
.../OpaquePartitionedTridentSpoutExecutor.java | 10 +-
.../trident/spout/TridentSpoutExecutor.java | 4 +-
.../trident/topology/TridentBoltExecutor.java | 6 +-
.../topology/TridentTopologyBuilder.java | 23 +-
.../storm/trident/tuple/TridentTupleView.java | 18 +-
.../src/native/worker-launcher/impl/main.c | 10 +
.../worker-launcher/impl/worker-launcher.c | 49 +-
.../worker-launcher/impl/worker-launcher.h | 2 +
storm-core/src/py/storm/Nimbus-remote | 21 +
storm-core/src/py/storm/Nimbus.py | 595 ++
storm-core/src/py/storm/ttypes.py | 433 +-
storm-core/src/storm.thrift | 38 +
storm-core/src/ui/public/component.html | 167 +-
.../src/ui/public/deep_search_result.html | 155 +
storm-core/src/ui/public/images/search.png | Bin 0 -> 2354 bytes
.../src/ui/public/js/typeahead.jquery.min.js | 7 +
storm-core/src/ui/public/logviewer_search.html | 65 +
storm-core/src/ui/public/search_result.html | 100 +
.../templates/component-page-template.html | 53 +
.../deep-search-result-page-template.html | 66 +
.../logviewer-search-page-template.html | 44 +
.../templates/search-result-page-template.html | 60 +
.../templates/topology-page-template.html | 11 +
.../src/ui/public/templates/user-template.html | 17 +-
storm-core/src/ui/public/topology.html | 8 +-
.../test/clj/backtype/storm/grouping_test.clj | 90 +-
.../clj/backtype/storm/integration_test.clj | 4 +-
.../test/clj/backtype/storm/logviewer_test.clj | 418 +
.../storm/messaging/netty_integration_test.clj | 3 +-
.../storm/messaging/netty_unit_test.clj | 217 +-
.../test/clj/backtype/storm/messaging_test.clj | 3 +-
.../test/clj/backtype/storm/nimbus_test.clj | 3 +-
.../storm/pacemaker_state_factory_test.clj | 15 +
.../clj/org/apache/storm/pacemaker_test.clj | 15 +
.../storm/utils/DisruptorQueueTest.java | 9 +-
storm-dist/binary/src/main/assembly/binary.xml | 5 -
293 files changed, 16992 insertions(+), 5462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/bin/storm.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index b1eb0d0,cf1ece6..35aa8c8
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -17,12 -17,12 +17,12 @@@
(ns backtype.storm.cluster
(:import [org.apache.zookeeper.data Stat ACL Id]
[backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
- LogConfig]
+ LogConfig ProfileAction ProfileRequest NodeInfo]
[java.io Serializable])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
- (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState])
(:import [org.apache.curator.framework CuratorFramework])
(:import [backtype.storm.utils Utils])
+ (:import [backtype.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
(:import [java.security MessageDigest])
(:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
(:import [backtype.storm.nimbus NimbusInfo])
@@@ -376,6 -509,48 +388,48 @@@
[this storm-id log-config]
(.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
+ (set-worker-profile-request
+ [this storm-id profile-request]
+ (let [request-type (.get_action profile-request)
+ host (.get_node (.get_nodeInfo profile-request))
+ port (first (.get_port (.get_nodeInfo profile-request)))]
+ (.set_data cluster-state
+ (profiler-config-path storm-id host port request-type)
+ (Utils/serialize profile-request)
+ acls)))
+
+ (get-topology-profile-requests
+ [this storm-id thrift?]
+ (let [path (profiler-config-path storm-id)
- requests (if (exists-node? cluster-state path false)
++ requests (if (.node_exists cluster-state path false)
+ (dofor [c (.get_children cluster-state path false)]
+ (let [raw (.get_data cluster-state (str path "/" c) false)
+ request (maybe-deserialize raw ProfileRequest)]
+ (if thrift?
+ request
+ (clojurify-profile-request request)))))]
+ requests))
+
+ (delete-topology-profile-requests
+ [this storm-id profile-request]
+ (let [profile-request-inst (thriftify-profile-request profile-request)
+ action (:action profile-request)
+ host (:host profile-request)
+ port (:port profile-request)]
+ (.delete_node cluster-state
+ (profiler-config-path storm-id host port action))))
+
+ (get-worker-profile-requests
+ [this storm-id node-info thrift?]
+ (let [host (:host node-info)
+ port (:port node-info)
+ profile-requests (get-topology-profile-requests this storm-id thrift?)]
+ (if thrift?
+ (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo %)))))
+ profile-requests)
+ (filter #(and (= host (:host %)) (= port (:port %)))
+ profile-requests))))
+
(worker-heartbeat!
[this storm-id node port info]
(let [thrift-worker-hb (thriftify-zk-worker-hb info)]
@@@ -482,10 -657,11 +536,11 @@@
(remove-storm!
[this storm-id]
- (delete-node cluster-state (assignment-path storm-id))
- (delete-node cluster-state (code-distributor-path storm-id))
- (delete-node cluster-state (credentials-path storm-id))
- (delete-node cluster-state (log-config-path storm-id))
- (delete-node cluster-state (profiler-config-path storm-id))
+ (.delete_node cluster-state (assignment-path storm-id))
+ (.delete_node cluster-state (code-distributor-path storm-id))
+ (.delete_node cluster-state (credentials-path storm-id))
+ (.delete_node cluster-state (log-config-path storm-id))
++ (.delete_node cluster-state (profiler-config-path storm-id))
(remove-storm-base! this storm-id))
(set-credentials!
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/config.clj
index 89b4a30,1a5cb51..0700a37
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@@ -265,7 -273,16 +273,22 @@@
[conf id]
(LocalState. (worker-heartbeats-root conf id)))
+(defn override-login-config-with-system-property [conf]
+ (if-let [login_conf_file (System/getProperty "java.security.auth.login.config")]
+ (assoc conf "java.security.auth.login.config" login_conf_file)
+ conf))
++
+ (defn get-topo-logs-users
+ [topology-conf]
+ (sort (distinct (remove nil?
+ (concat
+ (topology-conf LOGS-USERS)
+ (topology-conf TOPOLOGY-USERS))))))
+
+ (defn get-topo-logs-groups
+ [topology-conf]
+ (sort (distinct (remove nil?
+ (concat
+ (topology-conf LOGS-GROUPS)
+ (topology-conf TOPOLOGY-GROUPS))))))
++
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index bffd953,e4507f5..769d010
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -39,11 -39,10 +39,10 @@@ public enum ControlMessage implements I
}
/**
- * Return a control message per an encoded status code
- * @param encoded
- * @return
+ * @param encoded status code
+ * @return a control message per an encoded status code
*/
- static ControlMessage mkMessage(short encoded) {
+ public static ControlMessage mkMessage(short encoded) {
for(ControlMessage cm: ControlMessage.values()) {
if(encoded == cm.code) return cm;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index 2fe5c2d,d7a86d1..70e7089
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@@ -27,14 -24,7 +27,13 @@@ import org.slf4j.LoggerFactory
/**
* Send and receive SASL tokens.
*/
-
-public class SaslMessageToken {
+public class SaslMessageToken implements INettySerializable {
+ public static final short IDENTIFIER = -500;
+
+ /** Class logger */
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SaslMessageToken.class);
+
/** Used for client or server's token to send or receive from each other. */
private byte[] token;
@@@ -92,9 -82,8 +91,9 @@@
if (token != null)
payload_len = token.length;
-
- bout.writeShort(identifier);
- bout.writeInt(payload_len);
+ bout.writeShort(IDENTIFIER);
+ bout.writeInt((int) payload_len);
++
if (payload_len > 0) {
bout.write(token);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 15fe9fb,5ce90a3..2836e80
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@@ -121,9 -121,7 +119,8 @@@ public class SaslStormServerHandler ext
LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+ "authentication is complete.");
ctx.getPipeline().remove(this);
+ server.authenticated(channel);
}
- return;
} else {
// Client should not be sending other-than-SASL messages before
// SaslServerHandler has removed itself from the pipeline. Such
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 719c84c,32c2bd7..5f23064
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -17,20 -17,6 +17,21 @@@
*/
package backtype.storm.messaging.netty;
+import backtype.storm.Config;
++import backtype.storm.grouping.Load;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.serialization.KryoValuesSerializer;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@@ -46,7 -32,23 +47,8 @@@ import java.util.concurrent.LinkedBlock
import java.util.concurrent.ThreadFactory;
import java.io.IOException;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
-import backtype.storm.Config;
-import backtype.storm.grouping.Load;
import backtype.storm.messaging.ConnectionWithStatus;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.serialization.KryoValuesSerializer;
-import backtype.storm.utils.Utils;
class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
@@@ -54,10 -56,10 +56,10 @@@
@SuppressWarnings("rawtypes")
Map storm_conf;
int port;
- private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String, AtomicInteger>();
+ private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
private final AtomicInteger messagesDequeued = new AtomicInteger(0);
private final AtomicInteger[] pendingMessages;
-
+
// Create multiple queues for incoming messages. The size equals the number of receiver threads.
// For message which is sent to same task, it will be stored in the same queue to preserve the message order.
private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
@@@ -79,18 -81,18 +81,18 @@@
this.storm_conf = storm_conf;
this.port = port;
_ser = new KryoValuesSerializer(storm_conf);
-
+
queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
roundRobinQueueId = 0;
- taskToQueueId = new HashMap<Integer, Integer>();
+ taskToQueueId = new HashMap<>();
-
+
message_queue = new LinkedBlockingQueue[queueCount];
pendingMessages = new AtomicInteger[queueCount];
for (int i = 0; i < queueCount; i++) {
- message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+ message_queue[i] = new LinkedBlockingQueue<>();
pendingMessages[i] = new AtomicInteger(0);
}
-
+
// Configure the server.
int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
int backlog = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500);
@@@ -237,18 -236,17 +236,17 @@@
}
return null;
}
-
+
/**
* register a newly created channel
- * @param channel
+ * @param channel newly created channel
*/
protected void addChannel(Channel channel) {
allChannels.add(channel);
}
-
+
/**
- * close a channel
- * @param channel
+ * @param channel channel to close
*/
public void closeChannel(Channel channel) {
channel.close().awaitUninterruptibly();
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 2c8eae9,877b6d8..696a2fc
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -51,15 -51,31 +51,31 @@@ public class StormClientHandler extend
//examine the response message from server
Object message = event.getMessage();
if (message instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage)message;
- if (msg==ControlMessage.FAILURE_RESPONSE)
- LOG.info("failure response:{}", msg);
-
+ ControlMessage msg = (ControlMessage)message;
+ if (msg==ControlMessage.FAILURE_RESPONSE) {
+ LOG.info("failure response:{}", msg);
+ }
+ } else if (message instanceof List) {
+ try {
+ //This should be the metrics, and there should only be one of them
+ List<TaskMessage> list = (List<TaskMessage>)message;
+ if (list.size() < 1) throw new RuntimeException("Didn't see enough load metrics ("+client.getDstAddress()+") "+list);
+ if (list.size() != 1) LOG.warn("Messages are not being delivered fast enough, got "+list.size()+" metrics messages at once("+client.getDstAddress()+")");
+ TaskMessage tm = ((List<TaskMessage>)message).get(list.size() - 1);
+ if (tm.task() != -1) throw new RuntimeException("Metrics messages are sent to the system task ("+client.getDstAddress()+") "+tm);
+ List metrics = _des.deserialize(tm.message());
+ if (metrics.size() < 1) throw new RuntimeException("No metrics data in the metrics message ("+client.getDstAddress()+") "+metrics);
+ if (!(metrics.get(0) instanceof Map)) throw new RuntimeException("The metrics did not have a map in the first slot ("+client.getDstAddress()+") "+metrics);
+ client.setLoadMetrics((Map<Integer, Double>)metrics.get(0));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
} else {
- throw new RuntimeException("Don't know how to handle a message of type "+message+" ("+client.getDstAddress()+")");
+ throw new RuntimeException("Don't know how to handle a message of type "
+ + message + " (" + client.getDstAddress() + ")");
}
}
-
+
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
client.notifyInterestChanged(e.getChannel());
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index cd79f4e,8062b4e..943199c
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@@ -34,11 -32,7 +32,10 @@@ import java.net.URI
import java.util.Collection;
import java.util.Set;
import java.util.HashSet;
+import java.util.HashMap;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
- import java.util.concurrent.ExecutorService;
public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@@ -77,39 -68,8 +74,39 @@@
}
/**
+ * Pull a set of keys out of a Configuration.
+ * @param configs_to_pull A set of config keys that you want the values of.
+ * @param conf The config to pull the key/value pairs out of.
+ * @param conf_entry The app configuration entry name to get stuff from.
+ * @return Return a map of the configs in configs_to_pull to their values.
+ */
+ public static SortedMap<String, ?> PullConfig(Configuration conf,
+ String conf_entry) throws IOException {
+ if(conf == null) {
+ return null;
+ }
+ AppConfigurationEntry configurationEntries[] = conf.getAppConfigurationEntry(conf_entry);
+ if(configurationEntries == null) {
+ String errorMessage = "Could not find a '" + conf_entry
+ + "' entry in this configuration: Client cannot start.";
+ throw new IOException(errorMessage);
+ }
+
+ TreeMap<String, Object> results = new TreeMap<>();
+
+
+ for(AppConfigurationEntry entry: configurationEntries) {
+ Map<String, ?> options = entry.getOptions();
+ for(String key : options.keySet()) {
+ results.put(key, options.get(key));
+ }
+ }
+ return results;
+ }
+
+ /**
* Construct a principal to local plugin
- * @param conf storm configuration
+ * @param storm_conf storm configuration
* @return the plugin
*/
public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/src/storm.thrift
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
index 5024922,0000000..813ae84
mode 100644,000000..100644
--- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
@@@ -1,135 -1,0 +1,150 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http://www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns org.apache.storm.pacemaker-state-factory-test
+ (:require [clojure.test :refer :all]
+ [conjure.core :refer :all]
+ [org.apache.storm.pacemaker [pacemaker-state-factory :as psf]])
+ (:import [backtype.storm.generated
+ HBExecutionException HBNodes HBRecords
+ HBServerMessageType HBMessage HBMessageData HBPulse]
+ [backtype.storm.cluster ClusterStateContext]
+ [org.mockito Mockito Matchers]))
+
+(defn- string-to-bytes [string]
+ (byte-array (map int string)))
+
+(defn- bytes-to-string [bytez]
+ (apply str (map char bytez)))
+
+(defprotocol send-capture
+ (send [this something])
+ (check-captured [this]))
+
+(defn- make-send-capture [response]
+ (let [captured (atom nil)]
+ (reify send-capture
+ (send [this something] (reset! captured something) response)
+ (check-captured [this] @captured))))
+
+(defmacro with-mock-pacemaker-client-and-state [client state response & body]
+ `(let [~client (make-send-capture ~response)]
+ (stubbing [psf/makeZKState nil
+ psf/makeClient ~client]
+ (let [~state (psf/-mkState nil nil nil nil (ClusterStateContext.))]
+ ~@body))))
+
+
+(deftest pacemaker_state_set_worker_hb
+ (testing "set_worker_hb"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)
+
+ (.set_worker_hb state "/foo" (string-to-bytes "data") nil)
+ (let [sent (.check-captured client)
+ pulse (.get_pulse (.get_data sent))]
+ (is (= (.get_type sent) HBServerMessageType/SEND_PULSE))
+ (is (= (.get_id pulse) "/foo"))
+ (is (= (bytes-to-string (.get_details pulse)) "data")))))
+
+ (testing "set_worker_hb"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/SEND_PULSE nil)
+
+ (is (thrown? HBExecutionException
+ (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
+
+
+
+(deftest pacemaker_state_delete_worker_hb
+ (testing "delete_worker_hb"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)
+
+ (.delete_worker_hb state "/foo/bar")
+ (let [sent (.check-captured client)]
+ (is (= (.get_type sent) HBServerMessageType/DELETE_PATH))
+ (is (= (.get_path (.get_data sent)) "/foo/bar")))))
+
+ (testing "delete_worker_hb"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/DELETE_PATH nil)
+
+ (is (thrown? HBExecutionException
+ (.delete_worker_hb state "/foo/bar"))))))
+
+(deftest pacemaker_state_get_worker_hb
+ (testing "get_worker_hb"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
+ (HBMessageData/pulse
+ (doto (HBPulse.)
+ (.set_id "/foo")
+ (.set_details (string-to-bytes "some data")))))
+
+ (.get_worker_hb state "/foo" false)
+ (let [sent (.check-captured client)]
+ (is (= (.get_type sent) HBServerMessageType/GET_PULSE))
+ (is (= (.get_path (.get_data sent)) "/foo")))))
+
+ (testing "get_worker_hb - fail (bad response)"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/GET_PULSE nil)
+
+ (is (thrown? HBExecutionException
+ (.get_worker_hb state "/foo" false)))))
+
+ (testing "get_worker_hb - fail (bad data)"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil)
+
+ (is (thrown? HBExecutionException
+ (.get_worker_hb state "/foo" false))))))
+
+(deftest pacemaker_state_get_worker_hb_children
+ (testing "get_worker_hb_children"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
+ (HBMessageData/nodes
+ (HBNodes. [])))
+
+ (.get_worker_hb_children state "/foo" false)
+ (let [sent (.check-captured client)]
+ (is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH))
+ (is (= (.get_path (.get_data sent)) "/foo")))))
+
+ (testing "get_worker_hb_children - fail (bad response)"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/DELETE_PATH nil)
+
+ (is (thrown? HBExecutionException
+ (.get_worker_hb_children state "/foo" false)))))
+
+ (testing "get_worker_hb_children - fail (bad data)"
+ (with-mock-pacemaker-client-and-state
+ client state
+ (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil)
+
+ (is (thrown? HBExecutionException
+ (.get_worker_hb_children state "/foo" false))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/62d725a8/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_test.clj
index ca7c693,0000000..4c5359f
mode 100644,000000..100644
--- a/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_test.clj
@@@ -1,227 -1,0 +1,242 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http://www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns org.apache.storm.pacemaker-test
+ (:require [clojure.test :refer :all]
+ [org.apache.storm.pacemaker [pacemaker :as pacemaker]]
+ [conjure.core :as conjure])
+ (:import [backtype.storm.generated
+ HBExecutionException HBNodes HBRecords
+ HBServerMessageType HBMessage HBMessageData HBPulse]))
+
+(defn- message-with-rand-id [type data]
+ (let [mid (rand-int 1000)
+ message (HBMessage. type data)]
+ (.set_message_id message mid)
+ [message mid]))
+
+(defn- string-to-bytes [string]
+ (byte-array (map int string)))
+
+(defn- bytes-to-string [bytez]
+ (apply str (map char bytez)))
+
+(defn- makenode [handler path]
+ (.handleMessage handler
+ (HBMessage.
+ HBServerMessageType/SEND_PULSE
+ (HBMessageData/pulse
+ (doto (HBPulse.)
+ (.set_id path)
+ (.set_details (string-to-bytes "nothing")))))
+ true))
+
+(deftest pacemaker-server-create-path
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "CREATE_PATH"
+ (let [[message mid] (message-with-rand-id
+ HBServerMessageType/CREATE_PATH
+ (HBMessageData/path "/testpath"))
+ response (.handleMessage handler message true)]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/CREATE_PATH_RESPONSE))
+ (is (= (.get_data response) nil)))))))
+
+(deftest pacemaker-server-exists
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "EXISTS - false"
+ (let [[message mid] (message-with-rand-id HBServerMessageType/EXISTS
+ (HBMessageData/path "/testpath"))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
+ (is (= (.get_boolval (.get_data good-response)) false))))
+
+ (testing "EXISTS - true"
+ (let [path "/exists_path"
+ data-string "pulse data"]
+ (let [[send _] (message-with-rand-id
+ HBServerMessageType/SEND_PULSE
+ (HBMessageData/pulse
+ (doto (HBPulse.)
+ (.set_id path)
+ (.set_details (string-to-bytes data-string)))))
+ _ (.handleMessage handler send true)
+ [message mid] (message-with-rand-id HBServerMessageType/EXISTS
+ (HBMessageData/path path))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/EXISTS_RESPONSE))
+ (is (= (.get_boolval (.get_data good-response)) true))))))))
+
+(deftest pacemaker-server-send-pulse-get-pulse
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "SEND_PULSE - GET_PULSE"
+ (let [path "/pulsepath"
+ data-string "pulse data"]
+ (let [[message mid] (message-with-rand-id
+ HBServerMessageType/SEND_PULSE
+ (HBMessageData/pulse
+ (doto (HBPulse.)
+ (.set_id path)
+ (.set_details (string-to-bytes data-string)))))
+ response (.handleMessage handler message true)]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE))
+ (is (= (.get_data response) nil)))
+ (let [[message mid] (message-with-rand-id
+ HBServerMessageType/GET_PULSE
+ (HBMessageData/path path))
+ response (.handleMessage handler message true)]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE))
+ (is (= (bytes-to-string (.get_details (.get_pulse (.get_data response)))) data-string))))))))
+
+(deftest pacemaker-server-get-all-pulse-for-path
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "GET_ALL_PULSE_FOR_PATH"
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_PULSE_FOR_PATH
+ (HBMessageData/path "/testpath"))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/GET_ALL_PULSE_FOR_PATH_RESPONSE))
+ (is (= (.get_data good-response) nil)))))))
+
+(deftest pacemaker-server-get-all-nodes-for-path
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "GET_ALL_NODES_FOR_PATH"
+ (makenode handler "/some-root-path/foo")
+ (makenode handler "/some-root-path/bar")
+ (makenode handler "/some-root-path/baz")
+ (makenode handler "/some-root-path/boo")
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
+ (HBMessageData/path "/some-root-path"))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)
+ ids (into #{} (.get_pulseIds (.get_nodes (.get_data good-response))))]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
+ (is (contains? ids "foo"))
+ (is (contains? ids "bar"))
+ (is (contains? ids "baz"))
+ (is (contains? ids "boo")))
+
+ (makenode handler "/some/deeper/path/foo")
+ (makenode handler "/some/deeper/path/bar")
+ (makenode handler "/some/deeper/path/baz")
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
+ (HBMessageData/path "/some/deeper/path"))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)
+ ids (into #{} (.get_pulseIds (.get_nodes (.get_data good-response))))]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
+ (is (contains? ids "foo"))
+ (is (contains? ids "bar"))
+ (is (contains? ids "baz")))))))
+
+(deftest pacemaker-server-get-pulse
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "GET_PULSE"
+ (makenode handler "/some-root/GET_PULSE")
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_PULSE
+ (HBMessageData/path "/some-root/GET_PULSE"))
+ bad-response (.handleMessage handler message false)
+ good-response (.handleMessage handler message true)
+ good-pulse (.get_pulse (.get_data good-response))]
+ (is (= (.get_message_id bad-response) mid))
+ (is (= (.get_type bad-response) HBServerMessageType/NOT_AUTHORIZED))
+ (is (= (.get_data bad-response) nil))
+
+ (is (= (.get_message_id good-response) mid))
+ (is (= (.get_type good-response) HBServerMessageType/GET_PULSE_RESPONSE))
+ (is (= (.get_id good-pulse) "/some-root/GET_PULSE"))
+ (is (= (bytes-to-string (.get_details good-pulse)) "nothing")))))))
+
+(deftest pacemaker-server-delete-path
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "DELETE_PATH"
+ (makenode handler "/some-root/DELETE_PATH/foo")
+ (makenode handler "/some-root/DELETE_PATH/bar")
+ (makenode handler "/some-root/DELETE_PATH/baz")
+ (makenode handler "/some-root/DELETE_PATH/boo")
+ (let [[message mid] (message-with-rand-id HBServerMessageType/DELETE_PATH
+ (HBMessageData/path "/some-root/DELETE_PATH"))
+ response (.handleMessage handler message true)]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/DELETE_PATH_RESPONSE))
+ (is (= (.get_data response) nil)))
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
+ (HBMessageData/path "/some-root/DELETE_PATH"))
+ response (.handleMessage handler message true)
+ ids (into #{} (.get_pulseIds (.get_nodes (.get_data response))))]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
+ (is (empty? ids)))))))
+
+(deftest pacemaker-server-delete-pulse-id
+ (conjure/stubbing
+ [pacemaker/register nil]
+ (let [handler (pacemaker/mk-handler {})]
+ (testing "DELETE_PULSE_ID"
+ (makenode handler "/some-root/DELETE_PULSE_ID/foo")
+ (makenode handler "/some-root/DELETE_PULSE_ID/bar")
+ (makenode handler "/some-root/DELETE_PULSE_ID/baz")
+ (makenode handler "/some-root/DELETE_PULSE_ID/boo")
+ (let [[message mid] (message-with-rand-id HBServerMessageType/DELETE_PULSE_ID
+ (HBMessageData/path "/some-root/DELETE_PULSE_ID/foo"))
+ response (.handleMessage handler message true)]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/DELETE_PULSE_ID_RESPONSE))
+ (is (= (.get_data response) nil)))
+ (let [[message mid] (message-with-rand-id HBServerMessageType/GET_ALL_NODES_FOR_PATH
+ (HBMessageData/path "/some-root/DELETE_PULSE_ID"))
+ response (.handleMessage handler message true)
+ ids (into #{} (.get_pulseIds (.get_nodes (.get_data response))))]
+ (is (= (.get_message_id response) mid))
+ (is (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE))
+ (is (not (contains? ids "foo"))))))))