You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:01:02 UTC
[30/37] storm git commit: Merge remote-tracking branch
'apache/master' into storm-820-agg-stats-on-nimbus
Merge remote-tracking branch 'apache/master' into storm-820-agg-stats-on-nimbus
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5d847945
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5d847945
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5d847945
Branch: refs/heads/master
Commit: 5d84794521b2f581d2ccf3b144f8c73cfb277e75
Parents: 5266a16 99285bb
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Tue Sep 29 13:52:53 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Tue Sep 29 13:52:53 2015 -0500
----------------------------------------------------------------------
.gitignore | 2 +-
.travis.yml | 14 +-
CHANGELOG.md | 343 +++--
DEVELOPER.md | 28 +-
LICENSE | 4 +-
NOTICE | 4 +-
README.markdown | 39 +-
SECURITY.md | 244 ++--
STORM-UI-REST-API.md | 72 +-
TODO | 178 ---
bin/storm-config.cmd | 18 +-
bin/storm.py | 85 +-
conf/defaults.yaml | 20 +-
conf/jaas_kerberos.conf | 17 +
conf/log4j2.xml | 32 +
conf/logback.xml | 28 -
dev-tools/github/__init__.py | 205 +--
dev-tools/jira-github-join.py | 77 +-
dev-tools/jira/__init__.py | 435 +++---
dev-tools/report/__init__.py | 14 +
dev-tools/report/formatter.py | 68 +
dev-tools/report/report.py | 252 ++++
dev-tools/report/report_builder.py | 86 ++
dev-tools/storm-merge.py | 2 +-
dev-tools/test-ns.py | 36 +-
.../print-errors-from-clojure-test-reports.py | 58 -
.../travis/print-errors-from-test-reports.py | 76 +
dev-tools/travis/save-logs.py | 54 +
dev-tools/travis/travis-build.sh | 50 -
dev-tools/travis/travis-install.sh | 37 +
dev-tools/travis/travis-script.sh | 43 +
doap_Storm.rdf | 6 +-
docs/_posts/2015-06-04-storm095-released.md | 23 +
.../2015-06-15-storm0100-beta-released.md | 294 ++++
docs/about/multi-language.md | 2 +-
docs/documentation/Concepts.md | 4 +-
docs/documentation/Documentation.md | 4 +-
docs/documentation/FAQ.md | 7 +-
.../Guaranteeing-message-processing.md | 4 +-
docs/documentation/Maven.md | 4 +-
docs/documentation/Metrics.md | 2 +-
docs/documentation/Multilang-protocol.md | 9 +-
...unning-topologies-on-a-production-cluster.md | 4 +-
.../documentation/Setting-up-a-Storm-cluster.md | 10 +-
docs/documentation/Structure-of-the-codebase.md | 2 +-
docs/documentation/Troubleshooting.md | 37 +
docs/documentation/Tutorial.md | 16 +-
...nding-the-parallelism-of-a-Storm-topology.md | 5 +-
.../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes
.../images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes
docs/documentation/nimbus-ha-design.md | 217 +++
docs/downloads.html | 95 +-
examples/storm-starter/README.markdown | 8 +-
examples/storm-starter/pom.xml | 31 +-
.../jvm/storm/starter/BasicDRPCTopology.java | 5 +-
.../src/jvm/storm/starter/ReachTopology.java | 4 +-
.../storm/starter/TransactionalGlobalCount.java | 5 +-
.../starter/trident/TridentKafkaWordCount.java | 230 +++
external/flux/.gitignore | 15 +
external/flux/README.md | 834 +++++++++++
external/flux/flux-core/pom.xml | 96 ++
.../main/java/org/apache/storm/flux/Flux.java | 263 ++++
.../java/org/apache/storm/flux/FluxBuilder.java | 595 ++++++++
.../apache/storm/flux/api/TopologySource.java | 39 +
.../org/apache/storm/flux/model/BeanDef.java | 39 +
.../apache/storm/flux/model/BeanReference.java | 39 +
.../org/apache/storm/flux/model/BoltDef.java | 24 +
.../storm/flux/model/ConfigMethodDef.java | 62 +
.../storm/flux/model/ExecutionContext.java | 77 ++
.../apache/storm/flux/model/GroupingDef.java | 77 ++
.../org/apache/storm/flux/model/IncludeDef.java | 54 +
.../org/apache/storm/flux/model/ObjectDef.java | 90 ++
.../apache/storm/flux/model/PropertyDef.java | 58 +
.../org/apache/storm/flux/model/SpoutDef.java | 24 +
.../org/apache/storm/flux/model/StreamDef.java | 64 +
.../apache/storm/flux/model/TopologyDef.java | 216 +++
.../storm/flux/model/TopologySourceDef.java | 36 +
.../org/apache/storm/flux/model/VertexDef.java | 36 +
.../apache/storm/flux/parser/FluxParser.java | 208 +++
.../flux-core/src/main/resources/splash.txt | 9 +
.../org/apache/storm/flux/FluxBuilderTest.java | 31 +
.../org/apache/storm/flux/IntegrationTest.java | 39 +
.../java/org/apache/storm/flux/TCKTest.java | 245 ++++
.../multilang/MultilangEnvirontmentTest.java | 89 ++
.../apache/storm/flux/test/SimpleTopology.java | 59 +
.../storm/flux/test/SimpleTopologySource.java | 52 +
.../test/SimpleTopologyWithConfigParam.java | 55 +
.../org/apache/storm/flux/test/TestBolt.java | 80 ++
.../storm/flux/test/TridentTopologySource.java | 71 +
.../src/test/resources/configs/bad_hbase.yaml | 98 ++
.../resources/configs/config-methods-test.yaml | 70 +
.../resources/configs/diamond-topology.yaml | 87 ++
.../existing-topology-method-override.yaml | 25 +
.../existing-topology-reflection-config.yaml | 24 +
.../configs/existing-topology-reflection.yaml | 24 +
.../configs/existing-topology-trident.yaml | 24 +
.../resources/configs/existing-topology.yaml | 23 +
.../src/test/resources/configs/hdfs_test.yaml | 97 ++
.../test/resources/configs/include_test.yaml | 25 +
.../configs/invalid-existing-topology.yaml | 33 +
.../src/test/resources/configs/kafka_test.yaml | 126 ++
.../src/test/resources/configs/shell_test.yaml | 104 ++
.../test/resources/configs/simple_hbase.yaml | 120 ++
.../resources/configs/substitution-test.yaml | 106 ++
.../src/test/resources/configs/tck.yaml | 95 ++
.../src/test/resources/configs/test.properties | 18 +
.../flux-core/src/test/resources/log4j2.xml | 34 +
external/flux/flux-examples/README.md | 66 +
external/flux/flux-examples/pom.xml | 141 ++
.../storm/flux/examples/WordCountClient.java | 74 +
.../apache/storm/flux/examples/WordCounter.java | 71 +
.../src/main/resources/hbase_bolt.properties | 18 +
.../src/main/resources/hdfs_bolt.properties | 26 +
.../src/main/resources/kafka_spout.yaml | 136 ++
.../src/main/resources/multilang.yaml | 89 ++
.../src/main/resources/simple_hbase.yaml | 92 ++
.../src/main/resources/simple_hdfs.yaml | 105 ++
.../src/main/resources/simple_wordcount.yaml | 68 +
external/flux/flux-ui/README.md | 3 +
external/flux/flux-wrappers/pom.xml | 51 +
.../flux/wrappers/bolts/FluxShellBolt.java | 56 +
.../storm/flux/wrappers/bolts/LogInfoBolt.java | 44 +
.../flux/wrappers/spouts/FluxShellSpout.java | 55 +
.../main/resources/resources/randomsentence.js | 93 ++
.../main/resources/resources/splitsentence.py | 24 +
external/flux/pom.xml | 119 ++
external/storm-elasticsearch/README.md | 99 ++
external/storm-elasticsearch/pom.xml | 103 ++
.../elasticsearch/ElasticsearchGetRequest.java | 36 +
.../elasticsearch/EsLookupResultOutput.java | 43 +
.../elasticsearch/bolt/AbstractEsBolt.java | 81 ++
.../storm/elasticsearch/bolt/EsIndexBolt.java | 75 +
.../storm/elasticsearch/bolt/EsLookupBolt.java | 81 ++
.../elasticsearch/bolt/EsPercolateBolt.java | 88 ++
.../common/DefaultEsTupleMapper.java | 42 +
.../storm/elasticsearch/common/EsConfig.java | 82 ++
.../elasticsearch/common/EsTupleMapper.java | 55 +
.../common/StormElasticSearchClient.java | 48 +
.../common/TransportAddresses.java | 72 +
.../storm/elasticsearch/trident/EsState.java | 113 ++
.../elasticsearch/trident/EsStateFactory.java | 55 +
.../storm/elasticsearch/trident/EsUpdater.java | 35 +
.../bolt/AbstractEsBoltIntegrationTest.java | 91 ++
.../elasticsearch/bolt/AbstractEsBoltTest.java | 65 +
.../elasticsearch/bolt/EsIndexBoltTest.java | 69 +
.../elasticsearch/bolt/EsIndexTopology.java | 120 ++
.../bolt/EsLookupBoltIntegrationTest.java | 137 ++
.../elasticsearch/bolt/EsLookupBoltTest.java | 125 ++
.../elasticsearch/bolt/EsPercolateBoltTest.java | 62 +
.../elasticsearch/common/EsConfigTest.java | 71 +
.../storm/elasticsearch/common/EsConstants.java | 22 +
.../storm/elasticsearch/common/EsTestUtil.java | 75 +
.../common/TransportAddressesTest.java | 81 ++
.../trident/EsStateFactoryTest.java | 32 +
.../trident/TridentEsTopology.java | 135 ++
external/storm-eventhubs/pom.xml | 40 +-
.../eventhubs/bolt/DefaultEventDataFormat.java | 47 +
.../storm/eventhubs/bolt/EventHubBolt.java | 56 +-
.../eventhubs/bolt/EventHubBoltConfig.java | 109 ++
.../storm/eventhubs/bolt/IEventDataFormat.java | 28 +
.../client/ConnectionStringBuilder.java | 116 --
.../storm/eventhubs/client/Constants.java | 32 -
.../storm/eventhubs/client/EventHubClient.java | 92 --
.../eventhubs/client/EventHubConsumerGroup.java | 72 -
.../eventhubs/client/EventHubException.java | 37 -
.../eventhubs/client/EventHubReceiver.java | 139 --
.../eventhubs/client/EventHubSendClient.java | 70 -
.../storm/eventhubs/client/EventHubSender.java | 95 --
.../storm/eventhubs/client/SelectorFilter.java | 38 -
.../eventhubs/client/SelectorFilterWriter.java | 64 -
.../storm/eventhubs/samples/EventCount.java | 5 +-
.../storm/eventhubs/samples/EventHubLoop.java | 9 +-
.../eventhubs/spout/EventHubReceiverFilter.java | 56 -
.../eventhubs/spout/EventHubReceiverImpl.java | 49 +-
.../storm/eventhubs/spout/EventHubSpout.java | 5 +
.../eventhubs/spout/EventHubSpoutConfig.java | 126 +-
.../eventhubs/spout/IEventHubReceiver.java | 5 +-
.../spout/IEventHubReceiverFilter.java | 35 -
.../eventhubs/spout/SimplePartitionManager.java | 11 +-
.../spout/StaticPartitionCoordinator.java | 2 +-
.../trident/OpaqueTridentEventHubSpout.java | 2 +-
.../TransactionalTridentEventHubEmitter.java | 2 +-
.../TransactionalTridentEventHubSpout.java | 2 +-
.../trident/TridentPartitionManager.java | 12 +-
.../src/main/resources/config.properties | 20 +-
.../eventhubs/spout/EventHubReceiverMock.java | 18 +-
.../spout/SpoutOutputCollectorMock.java | 5 +
.../eventhubs/spout/TestEventHubSpout.java | 4 +-
external/storm-hbase/LICENSE | 202 ---
external/storm-hbase/pom.xml | 2 +-
.../org/apache/storm/hbase/common/Utils.java | 10 +-
external/storm-hdfs/README.md | 9 +
external/storm-hdfs/pom.xml | 19 +-
.../hdfs/common/security/HdfsSecurityUtil.java | 5 +-
.../ha/codedistributor/HDFSCodeDistributor.java | 101 ++
.../apache/storm/hdfs/trident/HdfsState.java | 392 ++++--
.../trident/rotation/FileRotationPolicy.java | 14 +
.../rotation/FileSizeRotationPolicy.java | 13 +
.../hdfs/trident/rotation/NoRotationPolicy.java | 10 +
.../trident/rotation/TimedRotationPolicy.java | 31 +-
.../storm/hdfs/trident/FixedBatchSpout.java | 2 +-
.../storm/hdfs/trident/HdfsStateTest.java | 206 +++
external/storm-hive/README.md | 18 +-
external/storm-hive/pom.xml | 45 +
.../org/apache/storm/hive/bolt/HiveBolt.java | 77 +-
.../apache/storm/hive/common/HiveOptions.java | 11 +
.../apache/storm/hive/common/HiveWriter.java | 5 +-
.../storm/hive/trident/HiveStateFactory.java | 17 +
.../apache/storm/hive/trident/HiveUpdater.java | 17 +
.../apache/storm/hive/bolt/TestHiveBolt.java | 112 +-
.../storm/hive/trident/TridentHiveTopology.java | 2 +-
external/storm-jdbc/LICENSE | 202 ---
external/storm-jdbc/README.md | 76 +-
external/storm-jdbc/pom.xml | 4 +
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 17 +-
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +-
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +-
.../org/apache/storm/jdbc/common/Column.java | 7 +-
.../storm/jdbc/common/ConnectionProvider.java | 43 +
.../jdbc/common/HikariCPConnectionProvider.java | 63 +
.../apache/storm/jdbc/common/JdbcClient.java | 19 +-
.../storm/jdbc/mapper/JdbcLookupMapper.java | 17 +
.../jdbc/mapper/SimpleJdbcLookupMapper.java | 17 +
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +-
.../storm/jdbc/trident/state/JdbcState.java | 13 +-
.../storm/jdbc/common/JdbcClientTest.java | 5 +-
.../jdbc/topology/AbstractUserTopology.java | 17 +-
.../jdbc/topology/UserPersistanceTopology.java | 18 +-
.../UserPersistanceTridentTopology.java | 2 +-
external/storm-jdbc/src/test/sql/test.sql | 17 +
external/storm-kafka/CHANGELOG.md | 13 -
external/storm-kafka/README.md | 57 +-
external/storm-kafka/pom.xml | 18 +-
.../ExponentialBackoffMsgRetryManager.java | 25 +-
.../jvm/storm/kafka/FailedMsgRetryManager.java | 3 +
.../src/jvm/storm/kafka/KafkaSpout.java | 17 +-
.../src/jvm/storm/kafka/PartitionManager.java | 23 +-
.../src/jvm/storm/kafka/SpoutConfig.java | 3 +
.../src/jvm/storm/kafka/ZkCoordinator.java | 2 +-
.../src/jvm/storm/kafka/ZkState.java | 2 +-
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 92 +-
.../kafka/trident/OpaqueTridentKafkaSpout.java | 4 +-
.../trident/TransactionalTridentKafkaSpout.java | 4 +-
.../ExponentialBackoffMsgRetryManagerTest.java | 43 +-
.../src/test/storm/kafka/KafkaUtilsTest.java | 32 +-
.../src/test/storm/kafka/ZkCoordinatorTest.java | 1 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 106 +-
external/storm-redis/LICENSE | 202 ---
external/storm-redis/README.md | 21 +-
.../storm/redis/bolt/AbstractRedisBolt.java | 42 +
.../storm/redis/bolt/RedisLookupBolt.java | 23 +-
.../apache/storm/redis/bolt/RedisStoreBolt.java | 21 +
.../redis/common/config/JedisClusterConfig.java | 55 +-
.../redis/common/config/JedisPoolConfig.java | 66 +
.../common/container/JedisClusterContainer.java | 18 +
.../JedisCommandsContainerBuilder.java | 15 +
.../JedisCommandsInstanceContainer.java | 12 +
.../redis/common/container/JedisContainer.java | 16 +
.../common/mapper/RedisDataTypeDescription.java | 20 +
.../redis/common/mapper/RedisLookupMapper.java | 7 +-
.../storm/redis/common/mapper/RedisMapper.java | 9 +-
.../redis/common/mapper/RedisStoreMapper.java | 3 +
.../storm/redis/common/mapper/TupleMapper.java | 18 +-
.../trident/state/AbstractRedisMapState.java | 137 ++
.../state/AbstractRedisStateQuerier.java | 95 ++
.../state/AbstractRedisStateUpdater.java | 96 ++
.../storm/redis/trident/state/KeyFactory.java | 57 +
.../storm/redis/trident/state/Options.java | 40 +
.../trident/state/RedisClusterMapState.java | 284 ++--
.../redis/trident/state/RedisClusterState.java | 44 +-
.../trident/state/RedisClusterStateQuerier.java | 75 +-
.../trident/state/RedisClusterStateUpdater.java | 89 +-
.../redis/trident/state/RedisMapState.java | 333 +++--
.../storm/redis/trident/state/RedisState.java | 44 +-
.../redis/trident/state/RedisStateQuerier.java | 71 +-
.../redis/trident/state/RedisStateUpdater.java | 93 +-
.../redis/trident/WordCountLookupMapper.java | 57 +
.../redis/trident/WordCountStoreMapper.java | 39 +
.../redis/trident/WordCountTridentRedis.java | 12 +-
.../trident/WordCountTridentRedisCluster.java | 11 +-
.../WordCountTridentRedisClusterMap.java | 6 +-
.../redis/trident/WordCountTridentRedisMap.java | 8 +-
.../redis/trident/WordCountTupleMapper.java | 16 -
external/storm-solr/README.md | 201 +++
external/storm-solr/pom.xml | 98 ++
.../apache/storm/solr/bolt/SolrUpdateBolt.java | 136 ++
.../storm/solr/config/CountBasedCommit.java | 59 +
.../storm/solr/config/SolrCommitStrategy.java | 30 +
.../apache/storm/solr/config/SolrConfig.java | 42 +
.../storm/solr/mapper/SolrFieldsMapper.java | 182 +++
.../storm/solr/mapper/SolrJsonMapper.java | 160 +++
.../apache/storm/solr/mapper/SolrMapper.java | 32 +
.../storm/solr/mapper/SolrMapperException.java | 24 +
.../org/apache/storm/solr/schema/CopyField.java | 50 +
.../org/apache/storm/solr/schema/Field.java | 50 +
.../org/apache/storm/solr/schema/FieldType.java | 63 +
.../org/apache/storm/solr/schema/Schema.java | 116 ++
.../storm/solr/schema/SolrFieldTypeFinder.java | 182 +++
.../schema/builder/RestJsonSchemaBuilder.java | 69 +
.../solr/schema/builder/SchemaBuilder.java | 27 +
.../apache/storm/solr/trident/SolrState.java | 67 +
.../storm/solr/trident/SolrStateFactory.java | 44 +
.../apache/storm/solr/trident/SolrUpdater.java | 33 +
.../storm/solr/spout/SolrFieldsSpout.java | 76 +
.../apache/storm/solr/spout/SolrJsonSpout.java | 120 ++
.../storm/solr/topology/SolrFieldsTopology.java | 56 +
.../storm/solr/topology/SolrJsonTopology.java | 48 +
.../storm/solr/topology/SolrTopology.java | 82 ++
.../solr/trident/SolrFieldsTridentTopology.java | 45 +
.../solr/trident/SolrJsonTridentTopology.java | 45 +
.../org/apache/storm/solr/util/TestUtil.java | 30 +
log4j2/cluster.xml | 76 +
log4j2/worker.xml | 77 ++
logback/cluster.xml | 85 --
logback/worker.xml | 41 -
pom.xml | 99 +-
storm-core/pom.xml | 334 +++--
storm-core/src/clj/backtype/storm/cluster.clj | 152 +-
.../clj/backtype/storm/command/get_errors.clj | 52 +
.../backtype/storm/command/shell_submission.clj | 9 +-
storm-core/src/clj/backtype/storm/config.clj | 30 +-
storm-core/src/clj/backtype/storm/converter.clj | 49 +-
.../backtype/storm/daemon/builtin_metrics.clj | 25 +-
.../src/clj/backtype/storm/daemon/common.clj | 47 +-
.../src/clj/backtype/storm/daemon/executor.clj | 182 ++-
.../src/clj/backtype/storm/daemon/logviewer.clj | 63 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 382 +++--
.../clj/backtype/storm/daemon/supervisor.clj | 85 +-
.../src/clj/backtype/storm/daemon/task.clj | 1 -
.../src/clj/backtype/storm/daemon/worker.clj | 141 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 23 +-
.../src/clj/backtype/storm/messaging/loader.clj | 13 +-
storm-core/src/clj/backtype/storm/thrift.clj | 23 +-
storm-core/src/clj/backtype/storm/timer.clj | 7 +-
storm-core/src/clj/backtype/storm/tuple.clj | 6 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 280 ++--
.../src/clj/backtype/storm/ui/helpers.clj | 12 +-
storm-core/src/clj/backtype/storm/util.clj | 27 +-
storm-core/src/clj/backtype/storm/zookeeper.clj | 134 +-
.../src/dev/drpc-simple-acl-test-scenario.yaml | 17 +
storm-core/src/jvm/backtype/storm/Config.java | 224 ++-
.../jvm/backtype/storm/ConfigValidation.java | 46 +-
.../src/jvm/backtype/storm/LogWriter.java | 83 ++
.../storm/codedistributor/ICodeDistributor.java | 56 +
.../LocalFileSystemCodeDistributor.java | 106 ++
.../coordination/BatchSubtopologyBuilder.java | 6 +-
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 6 +-
.../storm/drpc/LinearDRPCTopologyBuilder.java | 9 +-
.../backtype/storm/generated/Assignment.java | 194 +--
.../jvm/backtype/storm/generated/BoltStats.java | 342 ++---
.../storm/generated/ClusterSummary.java | 292 ++--
.../storm/generated/ClusterWorkerHeartbeat.java | 54 +-
.../storm/generated/ComponentPageInfo.java | 652 +++++++--
.../backtype/storm/generated/Credentials.java | 46 +-
.../backtype/storm/generated/DebugOptions.java | 506 +++++++
.../backtype/storm/generated/ExecutorStats.java | 162 +--
.../storm/generated/LSApprovedWorkers.java | 46 +-
.../generated/LSSupervisorAssignments.java | 50 +-
.../storm/generated/LSWorkerHeartbeat.java | 38 +-
.../storm/generated/LocalAssignment.java | 38 +-
.../storm/generated/LocalStateData.java | 50 +-
.../jvm/backtype/storm/generated/Nimbus.java | 1305 +++++++++++++++++-
.../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++
.../jvm/backtype/storm/generated/NodeInfo.java | 34 +-
.../storm/generated/RebalanceOptions.java | 46 +-
.../backtype/storm/generated/SpoutStats.java | 226 +--
.../jvm/backtype/storm/generated/StormBase.java | 225 ++-
.../storm/generated/SupervisorInfo.java | 110 +-
.../backtype/storm/generated/TopologyInfo.java | 398 +++++-
.../storm/generated/TopologyPageInfo.java | 316 ++++-
.../backtype/storm/generated/TopologyStats.java | 222 +--
.../storm/generated/TopologySummary.java | 109 +-
.../storm/grouping/PartialKeyGrouping.java | 27 +-
.../storm/messaging/ConnectionWithStatus.java | 17 +
.../backtype/storm/messaging/netty/Client.java | 476 +++----
.../backtype/storm/messaging/netty/Context.java | 20 +-
.../storm/messaging/netty/MessageBatch.java | 24 +-
.../storm/messaging/netty/MessageBuffer.java | 58 +
.../messaging/netty/SaslStormClientHandler.java | 4 +-
.../messaging/netty/StormClientHandler.java | 46 +
.../netty/StormClientPipelineFactory.java | 4 +-
.../backtype/storm/metric/EventLoggerBolt.java | 58 +
.../storm/metric/FileBasedEventLogger.java | 105 ++
.../jvm/backtype/storm/metric/IEventLogger.java | 59 +
.../storm/metric/MetricsConsumerBolt.java | 2 +-
.../jvm/backtype/storm/metric/SystemBolt.java | 15 +
.../backtype/storm/nimbus/ILeaderElector.java | 60 +
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 93 ++
.../security/auth/SaslTransportPlugin.java | 8 +-
.../security/auth/ShellBasedGroupsMapping.java | 2 +-
.../auth/authorizer/DRPCAuthorizerBase.java | 17 +
.../authorizer/DRPCSimpleACLAuthorizer.java | 20 +-
.../authorizer/ImpersonationAuthorizer.java | 19 +-
.../kerberos/KerberosSaslTransportPlugin.java | 5 +-
.../security/auth/kerberos/NoOpTTrasport.java | 40 +
.../auth/kerberos/jaas_kerberos_cluster.conf | 20 +-
.../auth/kerberos/jaas_kerberos_launcher.conf | 19 +
.../serialization/BlowfishTupleSerializer.java | 5 +-
.../storm/spout/ISpoutOutputCollector.java | 6 +-
.../jvm/backtype/storm/spout/ShellSpout.java | 10 +
.../storm/spout/SpoutOutputCollector.java | 5 +
.../src/jvm/backtype/storm/task/ShellBolt.java | 17 +-
.../backtype/storm/task/TopologyContext.java | 61 +-
.../testing/OpaqueMemoryTransactionalSpout.java | 2 +-
.../backtype/storm/testing/SpoutTracker.java | 6 +
.../storm/testing/TestWordBytesCounter.java | 27 +
.../backtype/storm/testing/TestWordCounter.java | 6 +-
.../topology/BaseConfigurationDeclarer.java | 2 +-
.../ComponentConfigurationDeclarer.java | 2 +-
.../storm/topology/IBasicOutputCollector.java | 5 +-
.../storm/topology/TopologyBuilder.java | 34 +-
.../TransactionalTopologyBuilder.java | 12 +-
.../src/jvm/backtype/storm/tuple/Fields.java | 21 +-
.../src/jvm/backtype/storm/tuple/ITuple.java | 126 +-
.../src/jvm/backtype/storm/tuple/Tuple.java | 3 +-
.../utils/DisruptorBackpressureCallback.java | 27 +
.../backtype/storm/utils/DisruptorQueue.java | 239 +++-
.../jvm/backtype/storm/utils/NimbusClient.java | 78 +-
.../jvm/backtype/storm/utils/RateTracker.java | 119 ++
.../jvm/backtype/storm/utils/RotatingMap.java | 12 +-
.../jvm/backtype/storm/utils/ShellProcess.java | 29 +-
.../StormBoundedExponentialBackoffRetry.java | 4 +-
.../jvm/backtype/storm/utils/TimeCacheMap.java | 60 +-
.../backtype/storm/utils/TransferDrainer.java | 62 +-
.../src/jvm/backtype/storm/utils/Utils.java | 67 +-
.../storm/utils/WorkerBackpressureCallback.java | 26 +
.../storm/utils/WorkerBackpressureThread.java | 59 +
.../src/jvm/storm/trident/TridentTopology.java | 13 +-
.../storm/trident/planner/SubtopologyBolt.java | 2 +-
.../storm/trident/spout/BatchSpoutExecutor.java | 2 +-
.../jvm/storm/trident/spout/IBatchSpout.java | 2 +-
.../spout/IOpaquePartitionedTridentSpout.java | 2 +-
.../trident/spout/IPartitionedTridentSpout.java | 2 +-
.../jvm/storm/trident/spout/ITridentSpout.java | 6 +-
.../OpaquePartitionedTridentSpoutExecutor.java | 20 +-
.../spout/PartitionedTridentSpoutExecutor.java | 26 +-
.../trident/spout/RichSpoutBatchExecutor.java | 15 +-
.../trident/spout/RichSpoutBatchTriggerer.java | 18 +-
.../trident/spout/TridentSpoutCoordinator.java | 6 +-
.../trident/spout/TridentSpoutExecutor.java | 10 +-
.../storm/trident/testing/FeederBatchSpout.java | 4 +-
.../testing/FeederCommitterBatchSpout.java | 2 +-
.../storm/trident/testing/FixedBatchSpout.java | 2 +-
.../trident/topology/TridentBoltExecutor.java | 17 +-
.../topology/TridentTopologyBuilder.java | 22 +-
.../worker-launcher/.deps/worker-launcher.Po | 16 +
storm-core/src/py/storm/Nimbus-remote | 7 +
storm-core/src/py/storm/Nimbus.py | 262 ++++
storm-core/src/py/storm/ttypes.py | 1268 +++++++++++------
storm-core/src/storm.thrift | 32 +-
storm-core/src/ui/public/component.html | 8 +
storm-core/src/ui/public/index.html | 21 +
storm-core/src/ui/public/js/script.js | 80 +-
.../templates/component-page-template.html | 19 +-
.../public/templates/index-page-template.html | 59 +-
.../templates/topology-page-template.html | 26 +-
storm-core/src/ui/public/topology.html | 38 +-
.../test/clj/backtype/storm/cluster_test.clj | 27 +-
.../test/clj/backtype/storm/config_test.clj | 65 +-
.../test/clj/backtype/storm/grouping_test.clj | 43 +-
.../test/clj/backtype/storm/logviewer_test.clj | 8 +-
.../storm/messaging/netty_unit_test.clj | 2 +-
.../test/clj/backtype/storm/nimbus_test.clj | 309 +++--
.../auth/DefaultHttpCredentialsPlugin_test.clj | 15 +
.../backtype/storm/security/auth/auth_test.clj | 4 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 15 +
.../storm/security/auth/drpc-auth-alice.jaas | 17 +
.../storm/security/auth/drpc-auth-bob.jaas | 17 +
.../storm/security/auth/drpc-auth-charlie.jaas | 17 +
.../storm/security/auth/drpc-auth-server.jaas | 17 +
.../storm/security/auth/drpc_auth_test.clj | 6 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../test/clj/backtype/storm/supervisor_test.clj | 33 +-
.../test/clj/backtype/storm/utils_test.clj | 12 -
.../storm/topology/TopologyBuilderTest.java | 48 +
.../utils/DisruptorQueueBackpressureTest.java | 115 ++
.../storm/utils/DisruptorQueueTest.java | 2 +-
.../backtype/storm/utils/RateTrackerTest.java | 62 +
storm-core/test/resources/log4j2-test.xml | 32 +
storm-core/test/resources/logback-test.xml | 26 -
storm-core/test/resources/test_runner.clj | 114 ++
storm-dist/binary/LICENSE | 224 ++-
storm-dist/binary/NOTICE | 4 +-
storm-dist/binary/src/main/assembly/binary.xml | 132 +-
484 files changed, 28370 insertions(+), 6657 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/common.clj
index e3a10ef,9da0e4f..a30850d
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@@ -273,6 -293,26 +293,26 @@@
(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+ ;; return the fields that event logger bolt expects
+ (defn eventlogger-bolt-fields []
+ [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID) (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
+ )
+
+ (defn add-eventlogger! [storm-conf ^StormTopology ret]
+ (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
+ eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
+ (EventLoggerBolt.)
+ {}
+ :p num-executors
+ :conf {TOPOLOGY-TASKS num-executors
+ TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
+
+ (doseq [[_ component] (all-components ret)
+ :let [common (.get_common component)]]
+ (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields))))
- (.put_to_bolts ret "__eventlogger" eventlogger-bolt)
++ (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
+ ))
+
(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
(.put_to_bolts topology comp-id bolt-spec)))
http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 9e880bb,7ea515b..789190c
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -32,11 -35,10 +35,14 @@@
(:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
- ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
+ ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
+ ComponentPageInfo TopologyPageInfo])
(:import [backtype.storm.daemon Shutdownable])
- (:use [backtype.storm util config log timer])
- (:require [backtype.storm [cluster :as cluster] [stats :as stats]])
+ (:use [backtype.storm util config log timer zookeeper])
- (:require [backtype.storm [cluster :as cluster] [stats :as stats] [converter :as converter]])
++ (:require [backtype.storm [cluster :as cluster]
++ [converter :as converter]
++ [stats :as stats]
++ [tuple :as tuple]])
(:require [clojure.set :as set])
(:import [backtype.storm.daemon.common StormBase Assignment])
(:use [backtype.storm.daemon common])
@@@ -1001,46 -1071,27 +1068,62 @@@
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)
- principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
+ principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
+ get-common-topo-info
+ (fn [^String storm-id operation]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ topology-conf (try-read-storm-conf conf storm-id)
+ storm-name (topology-conf TOPOLOGY-NAME)
+ _ (check-authorization! nimbus
+ storm-name
+ topology-conf
+ operation)
+ topology (try-read-storm-topology conf storm-id)
+ task->component (storm-task-info topology topology-conf)
+ base (.storm-base storm-cluster-state storm-id nil)
+ launch-time-secs (if base (:launch-time-secs base)
+ (throw
+ (NotAliveException. (str storm-id))))
+ assignment (.assignment-info storm-cluster-state storm-id nil)
+ beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
+ storm-id))
+ all-components (set (vals task->component))]
+ {:storm-name storm-name
+ :storm-cluster-state storm-cluster-state
+ :all-components all-components
+ :launch-time-secs launch-time-secs
+ :assignment assignment
+ :beats beats
+ :topology topology
+ :task->component task->component
+ :base base}))
+ get-last-error (fn [storm-cluster-state storm-id component-id]
+ (if-let [e (.last-error storm-cluster-state
+ storm-id
+ component-id)]
+ (doto (ErrorInfo. (:error e) (:time-secs e))
+ (.set_host (:host e))
+ (.set_port (:port e)))))]
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
+
+ ;add to nimbuses
+ (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus))
+ (NimbusSummary.
+ (.getHost (:nimbus-host-port-info nimbus))
+ (.getPort (:nimbus-host-port-info nimbus))
+ (current-time-secs)
+ false ;is-leader
+ (str (VersionInfo/getVersion))))
+
+ (.addToLeaderLockQueue (:leader-elector nimbus))
(cleanup-corrupt-topologies! nimbus)
- (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
- (transition! nimbus storm-id :startup))
+ (setup-code-distributor nimbus)
+
+ ;register call back for code-distributor
+ (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
+ (when (is-leader nimbus :throw-exception false)
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+ (transition! nimbus storm-id :startup)))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -1364,63 -1465,11 +1495,94 @@@
topo-info
))
-- (^TopologyInfo getTopologyInfo [this ^String storm-id]
++ (^TopologyInfo getTopologyInfo [this ^String topology-id]
(.getTopologyInfoWithOpts this
-- storm-id
++ topology-id
(doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
+ (^TopologyPageInfo getTopologyPageInfo
- [this ^String storm-id ^String window ^boolean include-sys?]
- (let [info (get-common-topo-info storm-id "getTopologyPageInfo")
++ [this ^String topo-id ^String window ^boolean include-sys?]
++ (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
+
+ exec->node+port (:executor->node+port (:assignment info))
+ last-err-fn (partial get-last-error
+ (:storm-cluster-state info)
- storm-id)
- topo-page-info (stats/agg-topo-execs-stats storm-id
++ topo-id)
++ topo-page-info (stats/agg-topo-execs-stats topo-id
+ exec->node+port
+ (:task->component info)
+ (:beats info)
+ (:topology info)
+ window
+ include-sys?
+ last-err-fn)]
+ (when-let [owner (:owner (:base info))]
+ (.set_owner topo-page-info owner))
- (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)]
++ (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
+ (.set_sched_status topo-page-info sched-status))
+ (doto topo-page-info
+ (.set_name (:storm-name info))
+ (.set_status (extract-status-str (:base info)))
+ (.set_uptime_secs (time-delta (:launch-time-secs info)))
+ (.set_topology_conf (to-json (try-read-storm-conf conf
- storm-id))))))
++ topo-id)))
++ (.set_replication_count
++ (.getReplicationCount (:code-distributor nimbus) topo-id)))
++ (when-let [debug-options
++ (get-in info [:base :component->debug topo-id])]
++ (.set_debug_options
++ topo-page-info
++ (converter/thriftify-debugoptions debug-options)))
++ topo-page-info))
+
+ (^ComponentPageInfo getComponentPageInfo
+ [this
- ^String topology-id
++ ^String topo-id
+ ^String component-id
+ ^String window
+ ^boolean include-sys?]
- (let [info (get-common-topo-info topology-id "getComponentPageInfo")
++ (let [info (get-common-topo-info topo-id "getComponentPageInfo")
+ {:keys [executor->node+port node->host]} (:assignment info)
+ executor->host+port (map-val (fn [[node port]]
+ [(node->host node) port])
+ executor->node+port)
- ret (stats/agg-comp-execs-stats executor->host+port
- (:task->component info)
- (:beats info)
- window
- include-sys?
- topology-id
- (:topology info)
- component-id)]
- (doto ret
++ comp-page-info (stats/agg-comp-execs-stats executor->host+port
++ (:task->component info)
++ (:beats info)
++ window
++ include-sys?
++ topo-id
++ (:topology info)
++ component-id)]
++ (doto comp-page-info
+ (.set_topology_name (:storm-name info))
+ (.set_errors (get-errors (:storm-cluster-state info)
- topology-id
- component-id)))))
++ topo-id
++ component-id))
++ (.set_topology_status (extract-status-str (:base info))))
++ (when-let [debug-options
++ (get-in info [:base :component->debug component-id])]
++ (.set_debug_options
++ comp-page-info
++ (converter/thriftify-debugoptions debug-options)))
++ ;; Add the event logger details.
++ (let [component->tasks (reverse-map (:task->component info))
++ eventlogger-tasks (sort (get component->tasks
++ EVENTLOGGER-COMPONENT-ID))
++ ;; Find the task the events from this component route to.
++ task-index (mod (tuple/list-hash-code [component-id])
++ (count eventlogger-tasks))
++ task-id (nth eventlogger-tasks task-index)
++ eventlogger-exec (first (filter (fn [[start stop]]
++ (between? task-id start stop))
++ (keys executor->host+port)))
++ [host port] (get executor->host+port eventlogger-exec)]
++ (if (and host port)
++ (doto comp-page-info
++ (.set_eventlog_host host)
++ (.set_eventlog_port port))))
++ comp-page-info))
+
Shutdownable
(shutdown [this]
(log-message "Shutting down master")
http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 0384316,414bfb1..4ae731d
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -21,20 -21,18 +21,21 @@@
ring.middleware.multipart-params)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
- (:use [backtype.storm config util log stats])
- (:use [backtype.storm config util log tuple zookeeper])
++ (:use [backtype.storm config util log stats tuple zookeeper])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
(:use [clojure.string :only [blank? lower-case trim]])
- (:import [backtype.storm.utils Utils])
+ (:import [backtype.storm.utils Utils]
+ [backtype.storm.generated NimbusSummary])
(:import [backtype.storm.generated ExecutorSpecificStats
- ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
+ ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
ErrorInfo ClusterSummary SupervisorSummary TopologySummary
Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
- KillOptions GetInfoOptions NumErrorsChoice TopologyPageInfo
- KillOptions GetInfoOptions NumErrorsChoice DebugOptions])
++ KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo
+ TopologyStats CommonAggregateStats ComponentAggregateStats
+ ComponentType BoltAggregateStats SpoutAggregateStats
+ ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo])
(:import [backtype.storm.security.auth AuthUtils ReqContext])
(:import [backtype.storm.generated AuthorizationException])
(:import [backtype.storm.security.auth AuthUtils])
@@@ -115,11 -263,102 +108,28 @@@
(let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
(into (sorted-map) ret )))
- (defn worker-log-link [host port topology-id]
-(defn error-subset
- [error-str]
- (apply str (take 200 error-str)))
-
-(defn most-recent-error
- [errors-list]
- (let [error (->> errors-list
- (sort-by #(.get_error_time_secs ^ErrorInfo %))
- reverse
- first)]
- error))
-
-(defn component-task-summs
- [^TopologyInfo summ topology id]
- (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- ret (if (contains? spout-comp-summs id)
- (spout-comp-summs id)
- (bolt-comp-summs id))]
- (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
-
+ (defn logviewer-link [host fname secure?]
+ (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
+ (url-format "https://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
+ fname)
+ (url-format "http://%s:%s/log?file=%s"
+ host
+ (*STORM-CONF* LOGVIEWER-PORT)
+ fname))
+ )
+
-(defn executor-has-task-id? [task-id executor-info]
- (between? task-id (.get_task_start executor-info) (.get_task_end executor-info)))
-
-(defn get-host-port [task-id executor-summs]
- (let [ex-sum (some #(if (executor-has-task-id? task-id (.get_executor_info %)) %) executor-summs)]
- {:host (.get_host ex-sum) :port (.get_port ex-sum)}))
-
-(defn get-sorted-eventlogger-task-ids [executor-summs]
- (let [executor-infos (map #(.get_executor_info %) executor-summs)]
- (sort (flatten (map #(range (.get_task_start %) (inc (.get_task_end %))) executor-infos)))))
-
-(defn get-eventlogger-executor-summs [^TopologyInfo topology-info topology]
- (let [bolt-summs (filter (partial bolt-summary? topology) (.get_executors topology-info))]
- ((group-by-comp bolt-summs) "__eventlogger")))
-
-;
-; The eventlogger uses fields grouping on the component-id so that events from same component
-; always goes to the same event logger task. Here we use the same fields grouping
-; to find the correct eventlogger task.
-(defn get-mapped-task-id [sorted-task-ids ^String component-id]
- (nth sorted-task-ids (mod (list-hash-code [component-id]) (count sorted-task-ids))))
-
+ (defn event-log-link
- [topology-id ^TopologyInfo topology-info topology component-id secure?]
- (let [executor-summs (get-eventlogger-executor-summs topology-info topology)
- sorted-task-ids (get-sorted-eventlogger-task-ids executor-summs)
- mapped-task-id (get-mapped-task-id sorted-task-ids component-id)
- host-port (get-host-port mapped-task-id executor-summs)
- fname (event-logs-filename topology-id (host-port :port))]
- (logviewer-link (host-port :host) fname secure?)))
++ [topology-id component-id host port secure?]
++ (logviewer-link host (event-logs-filename topology-id port) secure?))
+
+ (defn worker-log-link [host port topology-id secure?]
(let [fname (logs-filename topology-id port)]
- (url-format (str "http://%s:%s/log?file=%s")
- host (*STORM-CONF* LOGVIEWER-PORT) fname)))
+ (logviewer-link host fname secure?)))
+ (defn nimbus-log-link [host port]
+ (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
-
-(defn compute-executor-capacity
- [^ExecutorSummary e]
- (let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats true)
- (aggregate-bolt-streams)
- swap-map-order
- (get "600")))
- uptime (nil-to-zero (.get_uptime_secs e))
- window (if (< uptime 600) uptime 600)
- executed (-> stats :executed nil-to-zero)
- latency (-> stats :execute-latencies nil-to-zero)]
- (if (> window 0)
- (div (* executed latency) (* 1000 window)))))
-
-(defn compute-bolt-capacity
- [executors]
- (->> executors
- (map compute-executor-capacity)
- (map nil-to-zero)
- (apply max)))
-
(defn get-error-time
[error]
(if error
@@@ -132,9 -371,10 +142,10 @@@
""))
(defn get-error-port
- [error error-host top-id]
+ [error]
(if error
- (.get_port ^ErrorInfo error)))
+ (.get_port ^ErrorInfo error)
+ ""))
(defn get-error-host
[error]
@@@ -142,6 -382,41 +153,12 @@@
(.get_host ^ErrorInfo error)
""))
-(defn spout-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))))
-
-(defn bolt-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))))
-
-(defn total-aggregate-stats
- [spout-summs bolt-summs include-sys?]
- (let [spout-stats (get-filled-stats spout-summs)
- bolt-stats (get-filled-stats bolt-summs)
- agg-spout-stats (-> spout-stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams)
- agg-bolt-stats (-> bolt-stats
- (aggregate-bolt-stats include-sys?)
- aggregate-bolt-streams)]
- (merge-with
- (fn [s1 s2]
- (merge-with + s1 s2))
- (select-keys
- agg-bolt-stats
- ;; Include only keys that will be used. We want to count acked and
- ;; failed only for the "tuple trees," so we do not include those keys
- ;; from the bolt executors.
- [:emitted :transferred])
- agg-spout-stats)))
++(defn get-error-time
++ [error]
++ (if error
++ (.get_error_time_secs ^ErrorInfo error)
++ ""))
+
(defn stats-times
[stats-map]
(sort-by #(Integer/parseInt %)
@@@ -353,145 -671,185 +401,154 @@@
"tasksTotal" (.get_num_tasks t)
"workersTotal" (.get_num_workers t)
"executorsTotal" (.get_num_executors t)
+ "replicationCount" (.get_replication_count t)
"schedulerInfo" (.get_sched_status t)})}))
-(defn topology-stats [id window stats]
+(defn topology-stats [window stats]
(let [times (stats-times (:emitted stats))
display-map (into {} (for [t times] [t pretty-uptime-sec]))
display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
+ (for [w (concat times [":all-time"])
+ :let [disp ((display-map w) w)]]
{"windowPretty" disp
- "window" k
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "completeLatency" (float-str (get-in stats [:complete-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn spout-comp [top-id summ-map errors window include-sys? secure?]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))
- last-error (most-recent-error (get errors id))
- error-host (get-error-host last-error)
- error-port (get-error-port last-error error-host top-id)]]
- {"spoutId" id
- "encodedSpoutId" (url-encode id)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "emitted" (get-in stats [:emitted window])
- "transferred" (get-in stats [:transferred window])
- "completeLatency" (float-str (get-in stats [:complete-latencies window]))
- "acked" (get-in stats [:acked window])
- "failed" (get-in stats [:failed window])
- "errorHost" error-host
- "errorPort" error-port
- "errorWorkerLogLink" (worker-log-link error-host error-port top-id secure?)
- "errorLapsedSecs" (get-error-time last-error)
- "lastError" (get-error-data last-error)
- "time" (if last-error (* 1000 (long (.get_error_time_secs ^ErrorInfo last-error))))}))
-
-(defn bolt-comp [top-id summ-map errors window include-sys? secure?]
- (for [[id summs] summ-map
- :let [stats-seq (get-filled-stats summs)
- stats (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))
- last-error (most-recent-error (get errors id))
- error-host (get-error-host last-error)
- error-port (get-error-port last-error error-host top-id)]]
- {"boltId" id
- "encodedBoltId" (url-encode id)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "emitted" (get-in stats [:emitted window])
- "transferred" (get-in stats [:transferred window])
- "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
- "executeLatency" (float-str (get-in stats [:execute-latencies window]))
- "executed" (get-in stats [:executed window])
- "processLatency" (float-str (get-in stats [:process-latencies window]))
- "acked" (get-in stats [:acked window])
- "failed" (get-in stats [:failed window])
- "errorHost" error-host
- "errorPort" error-port
- "errorWorkerLogLink" (worker-log-link error-host error-port top-id secure?)
- "errorLapsedSecs" (get-error-time last-error)
- "lastError" (get-error-data last-error)
- "time" (if last-error (* 1000 (long (.get_error_time_secs ^ErrorInfo last-error))))}))
-
-(defn topology-summary [^TopologyInfo summ]
- (let [executors (.get_executors summ)
- workers (set (for [^ExecutorSummary e executors]
- [(.get_host e) (.get_port e)]))
- topology-id (.get_id summ)
- debug-options (get (.get_component_debug summ) topology-id)]
- {"id" topology-id
- "encodedId" (url-encode (.get_id summ))
- "owner" (.get_owner summ)
- "name" (.get_name summ)
- "status" (.get_status summ)
- "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
- "tasksTotal" (sum-tasks executors)
- "workersTotal" (count workers)
- "executorsTotal" (count executors)
- "schedulerInfo" (.get_sched_status summ)
- "debug" (if (not-nil? debug-options) (.is_enable debug-options) false)
- "samplingPct" (if (not-nil? debug-options) (.get_samplingpct debug-options) 10)
- "replicationCount" (.get_replication_count summ)}))
-
-(defn spout-summary-json [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- {"windowPretty" disp
- "window" k
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "completeLatency" (float-str (get-in stats [:complete-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
+ "window" w
+ "emitted" (get-in stats [:emitted w])
+ "transferred" (get-in stats [:transferred w])
+ "completeLatency" (float-str (get-in stats [:complete-latencies w]))
+ "acked" (get-in stats [:acked w])
+ "failed" (get-in stats [:failed w])})))
+
+(defn build-visualization [id window include-sys? user]
- (with-nimbus nimbus
++ (thrift/with-configured-nimbus-connection nimbus
+ (let [window (if window window ":all-time")
+ topology-info (->> (doto
+ (GetInfoOptions.)
+ (.set_num_err_choice NumErrorsChoice/ONE))
+ (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
+ id))
+ storm-topology (.getTopology ^Nimbus$Client nimbus id)
+ spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
+ bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
+ spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
+ bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
+ bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
+ id->spout-spec (.get_spouts storm-topology)
+ id->bolt (.get_bolts storm-topology)
+ visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
+ (hashmap-to-persistent id->bolt))
+ spout-comp-id->executor-summaries
+ bolt-comp-id->executor-summaries
+ window
+ id)]
+ {"visualizationTable" (stream-boxes visualizer-data)})))
+
+(defn- get-error-json
- [topo-id error-info]
++ [topo-id error-info secure?]
+ (let [host (get-error-host error-info)
+ port (get-error-port error-info)]
+ {"lastError" (get-error-data error-info)
++ "errorTime" (get-error-time error-info)
+ "errorHost" host
+ "errorPort" port
+ "errorLapsedSecs" (get-error-time error-info)
- "errorWorkerLogLink" (worker-log-link host port topo-id)}))
++ "errorWorkerLogLink" (worker-log-link host port topo-id secure?)}))
+
+(defn- common-agg-stats-json
+ "Returns a JSON representation of a common aggregated statistics."
+ [^CommonAggregateStats common-stats]
+ {"executors" (.get_num_executors common-stats)
+ "tasks" (.get_num_tasks common-stats)
+ "emitted" (.get_emitted common-stats)
+ "transferred" (.get_transferred common-stats)
+ "acked" (.get_acked common-stats)
+ "failed" (.get_failed common-stats)})
+
+(defmulti comp-agg-stats-json
+ "Returns a JSON representation of aggregated statistics."
- (fn [_ [id ^ComponentAggregateStats s]] (.get_type s)))
++ (fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod comp-agg-stats-json ComponentType/SPOUT
- [topo-id [id ^ComponentAggregateStats s]]
++ [topo-id secure? [id ^ComponentAggregateStats s]]
+ (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
- (get-error-json topo-id (.get_last_error s))
++ (get-error-json topo-id (.get_last_error s) secure?)
+ {"spoutId" id
+ "encodedSpoutId" (url-encode id)
+ "completeLatency" (float-str (.get_complete_latency_ms ss))})))
+
+(defmethod comp-agg-stats-json ComponentType/BOLT
- [topo-id [id ^ComponentAggregateStats s]]
++ [topo-id secure? [id ^ComponentAggregateStats s]]
+ (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
+ cs (.get_common_stats s)]
+ (merge
+ (common-agg-stats-json cs)
- (get-error-json topo-id (.get_last_error s))
++ (get-error-json topo-id (.get_last_error s) secure?)
+ {"boltId" id
+ "encodedBoltId" (url-encode id)
+ "capacity" (float-str (.get_capacity ss))
+ "executeLatency" (float-str (.get_execute_latency_ms ss))
+ "executed" (.get_executed ss)
+ "processLatency" (float-str (.get_process_latency_ms ss))})))
+
+(defn- unpack-topology-page-info
+ "Unpacks the serialized object to data structures"
- [^TopologyPageInfo topo-info window]
++ [^TopologyPageInfo topo-info window secure?]
+ (let [id (.get_id topo-info)
+ ^TopologyStats topo-stats (.get_topology_stats topo-info)
+ stat->window->number
+ {:emitted (.get_window_to_emitted topo-stats)
+ :transferred (.get_window_to_transferred topo-stats)
+ :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
+ :acked (.get_window_to_acked topo-stats)
+ :failed (.get_window_to_failed topo-stats)}
- topo-stats (topology-stats window stat->window->number)]
++ topo-stats (topology-stats window stat->window->number)
++ [debugEnabled
++ samplingPct] (if-let [debug-opts (.get_debug_options topo-info)]
++ [(.is_enable debug-opts)
++ (.get_samplingpct debug-opts)])]
+ {"id" id
+ "encodedId" (url-encode id)
+ "owner" (.get_owner topo-info)
+ "name" (.get_name topo-info)
+ "status" (.get_status topo-info)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs topo-info))
+ "tasksTotal" (.get_num_tasks topo-info)
+ "workersTotal" (.get_num_workers topo-info)
+ "executorsTotal" (.get_num_executors topo-info)
+ "schedulerInfo" (.get_sched_status topo-info)
+ "topologyStats" topo-stats
- "spouts" (map (partial comp-agg-stats-json id)
++ "spouts" (map (partial comp-agg-stats-json id secure?)
+ (.get_id_to_spout_agg_stats topo-info))
- "bolts" (map (partial comp-agg-stats-json id)
++ "bolts" (map (partial comp-agg-stats-json id secure?)
+ (.get_id_to_bolt_agg_stats topo-info))
- "configuration" (.get_topology_conf topo-info)}))
++ "configuration" (.get_topology_conf topo-info)
++ "debug" (or debugEnabled false)
++ "samplingPct" (or samplingPct 10)
++ "replicationCount" (.get_replication_count topo-info)}))
- (defn topology-page [id window include-sys? user]
- (with-nimbus nimbus
- (let [window (or window ":all-time")
+ (defn topology-page [id window include-sys? user secure?]
+ (thrift/with-configured-nimbus-connection nimbus
+ (let [window (if window window ":all-time")
window-hint (window-hint window)
- summ (->> (doto
- (GetInfoOptions.)
- (.set_num_err_choice NumErrorsChoice/ONE))
- (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
- topology (.getTopology ^Nimbus$Client nimbus id)
- topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
- spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
- bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
- spout-comp-summs (group-by-comp spout-summs)
- bolt-comp-summs (group-by-comp bolt-summs)
- bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
- name (.get_name summ)
- status (.get_status summ)
- msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
- spouts (.get_spouts topology)
- bolts (.get_bolts topology)
- replication-count (.get_replication_count summ)
- visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
- (hashmap-to-persistent bolts))
- spout-comp-summs
- bolt-comp-summs
- window
- id)]
+ topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
+ id
+ window
+ include-sys?)
+ topology-conf (from-json (.get_topology_conf topo-page-info))
+ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
(merge
- (unpack-topology-page-info topo-page-info window)
- (topology-summary summ)
++ (unpack-topology-page-info topo-page-info window secure?)
{"user" user
"window" window
"windowHint" window-hint
"msgTimeout" msg-timeout
- "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
- "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys? secure?)
- "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys? secure?)
"configuration" topology-conf
- "visualizationTable" (stream-boxes visualizer-data)
- "replicationCount" replication-count}))))
-
-(defn spout-output-stats
- [stream-summary window]
- (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
- (for [[s stats] (stream-summary window)]
- {"stream" s
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "completeLatency" (float-str (:complete-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))})))
-
-(defn spout-executor-stats
- [topology-id executors window include-sys? secure?]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams
- swap-map-order
- (get window)))]]
- {"id" (pretty-executor-info (.get_executor_info e))
- "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
- "uptime" (pretty-uptime-sec (.get_uptime_secs e))
- "host" (.get_host e)
- "port" (.get_port e)
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "completeLatency" (float-str (:complete-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))
- "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id secure?)}))
+ "visualizationTable" []}))))
(defn component-errors
- [errors-list topology-id]
+ [errors-list topology-id secure?]
(let [errors (->> errors-list
(sort-by #(.get_error_time_secs ^ErrorInfo %))
reverse)]
@@@ -500,188 -858,142 +557,204 @@@
{"time" (* 1000 (long (.get_error_time_secs e)))
"errorHost" (.get_host e)
"errorPort" (.get_port e)
- "errorWorkerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)
- "errorWorkerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id secure?)
++ "errorWorkerLogLink" (worker-log-link (.get_host e)
++ (.get_port e)
++ topology-id
++ secure?)
"errorLapsedSecs" (get-error-time e)
"error" (.get_error e)})}))
-(defn spout-stats
- [window ^TopologyInfo topology-info component executors include-sys? secure?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-spout-stats include-sys?))
- summary (-> stream-summary aggregate-spout-streams)]
- {"spoutSummary" (spout-summary-json
- (.get_id topology-info) component summary window)
- "outputStats" (spout-output-stats stream-summary window)
- "executorStats" (spout-executor-stats (.get_id topology-info)
- executors window include-sys? secure?)}))
-
-(defn bolt-summary
- [topology-id id stats window]
- (let [times (stats-times (:emitted stats))
- display-map (into {} (for [t times] [t pretty-uptime-sec]))
- display-map (assoc display-map ":all-time" (fn [_] "All time"))]
- (for [k (concat times [":all-time"])
- :let [disp ((display-map k) k)]]
- {"window" k
- "windowPretty" disp
- "emitted" (get-in stats [:emitted k])
- "transferred" (get-in stats [:transferred k])
- "executeLatency" (float-str (get-in stats [:execute-latencies k]))
- "executed" (get-in stats [:executed k])
- "processLatency" (float-str (get-in stats [:process-latencies k]))
- "acked" (get-in stats [:acked k])
- "failed" (get-in stats [:failed k])})))
-
-(defn bolt-output-stats
- [stream-summary window]
- (let [stream-summary (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:emitted :transferred])
- swap-map-order)]
- (for [[s stats] stream-summary]
- {"stream" s
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))})))
-
-(defn bolt-input-stats
- [stream-summary window]
- (let [stream-summary
- (-> stream-summary
- swap-map-order
- (get window)
- (select-keys [:acked :failed :process-latencies
- :executed :execute-latencies])
- swap-map-order)]
- (for [[^GlobalStreamId s stats] stream-summary]
- {"component" (.get_componentId s)
- "encodedComponent" (url-encode (.get_componentId s))
- "stream" (.get_streamId s)
- "executeLatency" (float-str (:execute-latencies stats))
- "processLatency" (float-str (:process-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))})))
-
-(defn bolt-executor-stats
- [topology-id executors window include-sys? secure?]
- (for [^ExecutorSummary e executors
- :let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats include-sys?)
- (aggregate-bolt-streams)
- swap-map-order
- (get window)))]]
- {"id" (pretty-executor-info (.get_executor_info e))
- "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
- "uptime" (pretty-uptime-sec (.get_uptime_secs e))
- "host" (.get_host e)
- "port" (.get_port e)
- "emitted" (nil-to-zero (:emitted stats))
- "transferred" (nil-to-zero (:transferred stats))
- "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
- "executeLatency" (float-str (:execute-latencies stats))
- "executed" (nil-to-zero (:executed stats))
- "processLatency" (float-str (:process-latencies stats))
- "acked" (nil-to-zero (:acked stats))
- "failed" (nil-to-zero (:failed stats))
- "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id secure?)}))
-
-(defn bolt-stats
- [window ^TopologyInfo topology-info component executors include-sys? secure?]
- (let [window-hint (str " (" (window-hint window) ")")
- stats (get-filled-stats executors)
- stream-summary (-> stats (aggregate-bolt-stats include-sys?))
- summary (-> stream-summary aggregate-bolt-streams)]
- {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
- "inputStats" (bolt-input-stats stream-summary window)
- "outputStats" (bolt-output-stats stream-summary window)
- "executorStats" (bolt-executor-stats
- (.get_id topology-info) executors window include-sys? secure?)}))
+(defmulti unpack-comp-agg-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-agg-stat ComponentType/BOLT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
+ "processLatency" (float-str (.get_process_latency_ms bolt-s))
+ "executed" (.get_executed bolt-s)
+ "capacity" (float-str (.get_capacity bolt-s))}))
+
+(defmethod unpack-comp-agg-stat ComponentType/SPOUT
+ [[window ^ComponentAggregateStats s]]
+ (let [^CommonAggregateStats comm-s (.get_common_stats s)
+ ^SpecificAggregateStats spec-s (.get_specific_stats s)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"window" window
+ "windowPretty" (window-hint window)
+ "emitted" (.get_emitted comm-s)
+ "transferred" (.get_transferred comm-s)
+ "acked" (.get_acked comm-s)
+ "failed" (.get_failed comm-s)
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+
+(defn- unpack-bolt-input-stat
+ [[^GlobalStreamId s ^ComponentAggregateStats stats]]
+ (let [^SpecificAggregateStats sas (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt sas)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ comp-id (.get_componentId s)]
+ {"component" comp-id
+ "encodedComponentId" (url-encode comp-id)
+ "stream" (.get_streamId s)
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-output-stat
+ (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
+
+(defmethod unpack-comp-output-stat ComponentType/BOLT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))}))
+
+(defmethod unpack-comp-output-stat ComponentType/SPOUT
+ [[stream-id ^ComponentAggregateStats stats]]
+ (let [^CommonAggregateStats cas (.get_common_stats stats)
+ ^SpecificAggregateStats spec-s (.get_specific_stats stats)
+ ^SpoutAggregateStats spout-s (.get_spout spec-s)]
+ {"stream" stream-id
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))}))
+
+(defmulti unpack-comp-exec-stat
- (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
++ (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
+
+(defmethod unpack-comp-exec-stat ComponentType/BOLT
- [topology-id ^ExecutorAggregateStats eas]
++ [topology-id secure? ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^BoltAggregateStats bas (.get_bolt ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "capacity" (float-str (nil-to-zero (.get_capacity bas)))
+ "executeLatency" (float-str (.get_execute_latency_ms bas))
+ "executed" (nil-to-zero (.get_executed bas))
+ "processLatency" (float-str (.get_process_latency_ms bas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
- "workerLogLink" (worker-log-link host port topology-id)}))
++ "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defmethod unpack-comp-exec-stat ComponentType/SPOUT
- [topology-id ^ExecutorAggregateStats eas]
++ [topology-id secure? ^ExecutorAggregateStats eas]
+ (let [^ExecutorSummary summ (.get_exec_summary eas)
+ ^ExecutorInfo info (.get_executor_info summ)
+ ^ComponentAggregateStats stats (.get_stats eas)
+ ^SpecificAggregateStats ss (.get_specific_stats stats)
+ ^SpoutAggregateStats sas (.get_spout ss)
+ ^CommonAggregateStats cas (.get_common_stats stats)
+ host (.get_host summ)
+ port (.get_port summ)
+ exec-id (pretty-executor-info info)]
+ {"id" exec-id
+ "encodedId" (url-encode exec-id)
+ "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+ "host" host
+ "port" port
+ "emitted" (nil-to-zero (.get_emitted cas))
+ "transferred" (nil-to-zero (.get_transferred cas))
+ "completeLatency" (float-str (.get_complete_latency_ms sas))
+ "acked" (nil-to-zero (.get_acked cas))
+ "failed" (nil-to-zero (.get_failed cas))
- "workerLogLink" (worker-log-link host port topology-id)}))
++ "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defmulti unpack-component-page-info
+ "Unpacks component-specific info to clojure data structures"
+ (fn [^ComponentPageInfo info & _]
+ (.get_component_type info)))
+
+(defmethod unpack-component-page-info ComponentType/BOLT
- [^ComponentPageInfo info topology-id window include-sys?]
++ [^ComponentPageInfo info topology-id window include-sys? secure?]
+ (merge
+ {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
- "executorStats" (map (partial unpack-comp-exec-stat topology-id)
++ "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
+ (.get_exec_stats info))}
- (-> info .get_errors (component-errors topology-id))))
++ (-> info .get_errors (component-errors topology-id secure?))))
+
+(defmethod unpack-component-page-info ComponentType/SPOUT
- [^ComponentPageInfo info topology-id window include-sys?]
++ [^ComponentPageInfo info topology-id window include-sys? secure?]
+ (merge
+ {"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info))
+ "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
- "executorStats" (map (partial unpack-comp-exec-stat topology-id)
++ "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
+ (.get_exec_stats info))}
- (-> info .get_errors (component-errors topology-id))))
++ (-> info .get_errors (component-errors topology-id secure?))))
(defn component-page
- [topology-id component window include-sys? user]
- (with-nimbus nimbus
+ [topology-id component window include-sys? user secure?]
+ (thrift/with-configured-nimbus-connection nimbus
- (let [window (if window window ":all-time")
- summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
- topology (.getTopology ^Nimbus$Client nimbus topology-id)
- topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))
- type (component-type topology component)
- summs (component-task-summs summ topology component)
- spec (cond (= type :spout) (spout-stats window summ component summs include-sys? secure?)
- (= type :bolt) (bolt-stats window summ component summs include-sys? secure?))
- errors (component-errors (get (.get_errors summ) component) topology-id secure?)
- component->debug (.get_component_debug summ)
- debug-options (get component->debug component (get component->debug topology-id))]
- (merge
- {"user" user
- "id" component
- "encodedId" (url-encode component)
- "name" (.get_name summ)
- "executors" (count summs)
- "tasks" (sum-tasks summs)
- "topologyId" topology-id
- "topologyStatus" (.get_status summ)
- "encodedTopologyId" (url-encode topology-id)
- "window" window
- "componentType" (name type)
- "windowHint" (window-hint window)
- "debug" (if (not-nil? debug-options) (.is_enable debug-options) false)
- "samplingPct" (if (not-nil? debug-options) (.get_samplingpct debug-options) 10)
- "eventLogLink" (event-log-link topology-id summ topology component secure?)}
- spec errors))))
+ (let [window (or window ":all-time")
+ window-hint (window-hint window)
+ comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
+ topology-id
+ component
+ window
+ include-sys?)
+ topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
+ topology-id))
- msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
++ msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
++ [debugEnabled
++ samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)]
++ [(.is_enable debug-opts)
++ (.get_samplingpct debug-opts)])]
+ (assoc
+ (unpack-component-page-info comp-page-info
+ topology-id
+ window
- include-sys?)
++ include-sys?
++ secure?)
+ "user" user
+ "id" component
+ "encodedId" (url-encode component)
+ "name" (.get_topology_name comp-page-info)
+ "executors" (.get_num_executors comp-page-info)
+ "tasks" (.get_num_tasks comp-page-info)
+ "topologyId" topology-id
++ "topologyStatus" (.get_topology_status comp-page-info)
+ "encodedTopologyId" (url-encode topology-id)
+ "window" window
+ "componentType" (-> comp-page-info .get_component_type str lower-case)
- "windowHint" window-hint))))
++ "windowHint" window-hint
++ "debug" (or debugEnabled false)
++ "samplingPct" (or samplingPct 10)
++ "eventLogLink" (event-log-link topology-id
++ component
++ (.get_eventlog_host comp-page-info)
++ (.get_eventlog_port comp-page-info)
++ secure?)))))
(defn topology-config [topology-id]
- (with-nimbus nimbus
+ (thrift/with-configured-nimbus-connection nimbus
- (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
+ (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
(defn topology-op-response [topology-id op]
{"topologyOperation" op,
http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------