You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/05 23:01:02 UTC

[30/37] storm git commit: Merge remote-tracking branch 'apache/master' into storm-820-agg-stats-on-nimbus

Merge remote-tracking branch 'apache/master' into storm-820-agg-stats-on-nimbus


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5d847945
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5d847945
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5d847945

Branch: refs/heads/master
Commit: 5d84794521b2f581d2ccf3b144f8c73cfb277e75
Parents: 5266a16 99285bb
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Tue Sep 29 13:52:53 2015 -0500
Committer: Derek Dagit <de...@yahoo-inc.com>
Committed: Tue Sep 29 13:52:53 2015 -0500

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 .travis.yml                                     |   14 +-
 CHANGELOG.md                                    |  343 +++--
 DEVELOPER.md                                    |   28 +-
 LICENSE                                         |    4 +-
 NOTICE                                          |    4 +-
 README.markdown                                 |   39 +-
 SECURITY.md                                     |  244 ++--
 STORM-UI-REST-API.md                            |   72 +-
 TODO                                            |  178 ---
 bin/storm-config.cmd                            |   18 +-
 bin/storm.py                                    |   85 +-
 conf/defaults.yaml                              |   20 +-
 conf/jaas_kerberos.conf                         |   17 +
 conf/log4j2.xml                                 |   32 +
 conf/logback.xml                                |   28 -
 dev-tools/github/__init__.py                    |  205 +--
 dev-tools/jira-github-join.py                   |   77 +-
 dev-tools/jira/__init__.py                      |  435 +++---
 dev-tools/report/__init__.py                    |   14 +
 dev-tools/report/formatter.py                   |   68 +
 dev-tools/report/report.py                      |  252 ++++
 dev-tools/report/report_builder.py              |   86 ++
 dev-tools/storm-merge.py                        |    2 +-
 dev-tools/test-ns.py                            |   36 +-
 .../print-errors-from-clojure-test-reports.py   |   58 -
 .../travis/print-errors-from-test-reports.py    |   76 +
 dev-tools/travis/save-logs.py                   |   54 +
 dev-tools/travis/travis-build.sh                |   50 -
 dev-tools/travis/travis-install.sh              |   37 +
 dev-tools/travis/travis-script.sh               |   43 +
 doap_Storm.rdf                                  |    6 +-
 docs/_posts/2015-06-04-storm095-released.md     |   23 +
 .../2015-06-15-storm0100-beta-released.md       |  294 ++++
 docs/about/multi-language.md                    |    2 +-
 docs/documentation/Concepts.md                  |    4 +-
 docs/documentation/Documentation.md             |    4 +-
 docs/documentation/FAQ.md                       |    7 +-
 .../Guaranteeing-message-processing.md          |    4 +-
 docs/documentation/Maven.md                     |    4 +-
 docs/documentation/Metrics.md                   |    2 +-
 docs/documentation/Multilang-protocol.md        |    9 +-
 ...unning-topologies-on-a-production-cluster.md |    4 +-
 .../documentation/Setting-up-a-Storm-cluster.md |   10 +-
 docs/documentation/Structure-of-the-codebase.md |    2 +-
 docs/documentation/Troubleshooting.md           |   37 +
 docs/documentation/Tutorial.md                  |   16 +-
 ...nding-the-parallelism-of-a-Storm-topology.md |    5 +-
 .../nimbus_ha_leader_election_and_failover.png  |  Bin 0 -> 154316 bytes
 .../images/nimbus_ha_topology_submission.png    |  Bin 0 -> 134180 bytes
 docs/documentation/nimbus-ha-design.md          |  217 +++
 docs/downloads.html                             |   95 +-
 examples/storm-starter/README.markdown          |    8 +-
 examples/storm-starter/pom.xml                  |   31 +-
 .../jvm/storm/starter/BasicDRPCTopology.java    |    5 +-
 .../src/jvm/storm/starter/ReachTopology.java    |    4 +-
 .../storm/starter/TransactionalGlobalCount.java |    5 +-
 .../starter/trident/TridentKafkaWordCount.java  |  230 +++
 external/flux/.gitignore                        |   15 +
 external/flux/README.md                         |  834 +++++++++++
 external/flux/flux-core/pom.xml                 |   96 ++
 .../main/java/org/apache/storm/flux/Flux.java   |  263 ++++
 .../java/org/apache/storm/flux/FluxBuilder.java |  595 ++++++++
 .../apache/storm/flux/api/TopologySource.java   |   39 +
 .../org/apache/storm/flux/model/BeanDef.java    |   39 +
 .../apache/storm/flux/model/BeanReference.java  |   39 +
 .../org/apache/storm/flux/model/BoltDef.java    |   24 +
 .../storm/flux/model/ConfigMethodDef.java       |   62 +
 .../storm/flux/model/ExecutionContext.java      |   77 ++
 .../apache/storm/flux/model/GroupingDef.java    |   77 ++
 .../org/apache/storm/flux/model/IncludeDef.java |   54 +
 .../org/apache/storm/flux/model/ObjectDef.java  |   90 ++
 .../apache/storm/flux/model/PropertyDef.java    |   58 +
 .../org/apache/storm/flux/model/SpoutDef.java   |   24 +
 .../org/apache/storm/flux/model/StreamDef.java  |   64 +
 .../apache/storm/flux/model/TopologyDef.java    |  216 +++
 .../storm/flux/model/TopologySourceDef.java     |   36 +
 .../org/apache/storm/flux/model/VertexDef.java  |   36 +
 .../apache/storm/flux/parser/FluxParser.java    |  208 +++
 .../flux-core/src/main/resources/splash.txt     |    9 +
 .../org/apache/storm/flux/FluxBuilderTest.java  |   31 +
 .../org/apache/storm/flux/IntegrationTest.java  |   39 +
 .../java/org/apache/storm/flux/TCKTest.java     |  245 ++++
 .../multilang/MultilangEnvirontmentTest.java    |   89 ++
 .../apache/storm/flux/test/SimpleTopology.java  |   59 +
 .../storm/flux/test/SimpleTopologySource.java   |   52 +
 .../test/SimpleTopologyWithConfigParam.java     |   55 +
 .../org/apache/storm/flux/test/TestBolt.java    |   80 ++
 .../storm/flux/test/TridentTopologySource.java  |   71 +
 .../src/test/resources/configs/bad_hbase.yaml   |   98 ++
 .../resources/configs/config-methods-test.yaml  |   70 +
 .../resources/configs/diamond-topology.yaml     |   87 ++
 .../existing-topology-method-override.yaml      |   25 +
 .../existing-topology-reflection-config.yaml    |   24 +
 .../configs/existing-topology-reflection.yaml   |   24 +
 .../configs/existing-topology-trident.yaml      |   24 +
 .../resources/configs/existing-topology.yaml    |   23 +
 .../src/test/resources/configs/hdfs_test.yaml   |   97 ++
 .../test/resources/configs/include_test.yaml    |   25 +
 .../configs/invalid-existing-topology.yaml      |   33 +
 .../src/test/resources/configs/kafka_test.yaml  |  126 ++
 .../src/test/resources/configs/shell_test.yaml  |  104 ++
 .../test/resources/configs/simple_hbase.yaml    |  120 ++
 .../resources/configs/substitution-test.yaml    |  106 ++
 .../src/test/resources/configs/tck.yaml         |   95 ++
 .../src/test/resources/configs/test.properties  |   18 +
 .../flux-core/src/test/resources/log4j2.xml     |   34 +
 external/flux/flux-examples/README.md           |   66 +
 external/flux/flux-examples/pom.xml             |  141 ++
 .../storm/flux/examples/WordCountClient.java    |   74 +
 .../apache/storm/flux/examples/WordCounter.java |   71 +
 .../src/main/resources/hbase_bolt.properties    |   18 +
 .../src/main/resources/hdfs_bolt.properties     |   26 +
 .../src/main/resources/kafka_spout.yaml         |  136 ++
 .../src/main/resources/multilang.yaml           |   89 ++
 .../src/main/resources/simple_hbase.yaml        |   92 ++
 .../src/main/resources/simple_hdfs.yaml         |  105 ++
 .../src/main/resources/simple_wordcount.yaml    |   68 +
 external/flux/flux-ui/README.md                 |    3 +
 external/flux/flux-wrappers/pom.xml             |   51 +
 .../flux/wrappers/bolts/FluxShellBolt.java      |   56 +
 .../storm/flux/wrappers/bolts/LogInfoBolt.java  |   44 +
 .../flux/wrappers/spouts/FluxShellSpout.java    |   55 +
 .../main/resources/resources/randomsentence.js  |   93 ++
 .../main/resources/resources/splitsentence.py   |   24 +
 external/flux/pom.xml                           |  119 ++
 external/storm-elasticsearch/README.md          |   99 ++
 external/storm-elasticsearch/pom.xml            |  103 ++
 .../elasticsearch/ElasticsearchGetRequest.java  |   36 +
 .../elasticsearch/EsLookupResultOutput.java     |   43 +
 .../elasticsearch/bolt/AbstractEsBolt.java      |   81 ++
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |   75 +
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |   81 ++
 .../elasticsearch/bolt/EsPercolateBolt.java     |   88 ++
 .../common/DefaultEsTupleMapper.java            |   42 +
 .../storm/elasticsearch/common/EsConfig.java    |   82 ++
 .../elasticsearch/common/EsTupleMapper.java     |   55 +
 .../common/StormElasticSearchClient.java        |   48 +
 .../common/TransportAddresses.java              |   72 +
 .../storm/elasticsearch/trident/EsState.java    |  113 ++
 .../elasticsearch/trident/EsStateFactory.java   |   55 +
 .../storm/elasticsearch/trident/EsUpdater.java  |   35 +
 .../bolt/AbstractEsBoltIntegrationTest.java     |   91 ++
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |   65 +
 .../elasticsearch/bolt/EsIndexBoltTest.java     |   69 +
 .../elasticsearch/bolt/EsIndexTopology.java     |  120 ++
 .../bolt/EsLookupBoltIntegrationTest.java       |  137 ++
 .../elasticsearch/bolt/EsLookupBoltTest.java    |  125 ++
 .../elasticsearch/bolt/EsPercolateBoltTest.java |   62 +
 .../elasticsearch/common/EsConfigTest.java      |   71 +
 .../storm/elasticsearch/common/EsConstants.java |   22 +
 .../storm/elasticsearch/common/EsTestUtil.java  |   75 +
 .../common/TransportAddressesTest.java          |   81 ++
 .../trident/EsStateFactoryTest.java             |   32 +
 .../trident/TridentEsTopology.java              |  135 ++
 external/storm-eventhubs/pom.xml                |   40 +-
 .../eventhubs/bolt/DefaultEventDataFormat.java  |   47 +
 .../storm/eventhubs/bolt/EventHubBolt.java      |   56 +-
 .../eventhubs/bolt/EventHubBoltConfig.java      |  109 ++
 .../storm/eventhubs/bolt/IEventDataFormat.java  |   28 +
 .../client/ConnectionStringBuilder.java         |  116 --
 .../storm/eventhubs/client/Constants.java       |   32 -
 .../storm/eventhubs/client/EventHubClient.java  |   92 --
 .../eventhubs/client/EventHubConsumerGroup.java |   72 -
 .../eventhubs/client/EventHubException.java     |   37 -
 .../eventhubs/client/EventHubReceiver.java      |  139 --
 .../eventhubs/client/EventHubSendClient.java    |   70 -
 .../storm/eventhubs/client/EventHubSender.java  |   95 --
 .../storm/eventhubs/client/SelectorFilter.java  |   38 -
 .../eventhubs/client/SelectorFilterWriter.java  |   64 -
 .../storm/eventhubs/samples/EventCount.java     |    5 +-
 .../storm/eventhubs/samples/EventHubLoop.java   |    9 +-
 .../eventhubs/spout/EventHubReceiverFilter.java |   56 -
 .../eventhubs/spout/EventHubReceiverImpl.java   |   49 +-
 .../storm/eventhubs/spout/EventHubSpout.java    |    5 +
 .../eventhubs/spout/EventHubSpoutConfig.java    |  126 +-
 .../eventhubs/spout/IEventHubReceiver.java      |    5 +-
 .../spout/IEventHubReceiverFilter.java          |   35 -
 .../eventhubs/spout/SimplePartitionManager.java |   11 +-
 .../spout/StaticPartitionCoordinator.java       |    2 +-
 .../trident/OpaqueTridentEventHubSpout.java     |    2 +-
 .../TransactionalTridentEventHubEmitter.java    |    2 +-
 .../TransactionalTridentEventHubSpout.java      |    2 +-
 .../trident/TridentPartitionManager.java        |   12 +-
 .../src/main/resources/config.properties        |   20 +-
 .../eventhubs/spout/EventHubReceiverMock.java   |   18 +-
 .../spout/SpoutOutputCollectorMock.java         |    5 +
 .../eventhubs/spout/TestEventHubSpout.java      |    4 +-
 external/storm-hbase/LICENSE                    |  202 ---
 external/storm-hbase/pom.xml                    |    2 +-
 .../org/apache/storm/hbase/common/Utils.java    |   10 +-
 external/storm-hdfs/README.md                   |    9 +
 external/storm-hdfs/pom.xml                     |   19 +-
 .../hdfs/common/security/HdfsSecurityUtil.java  |    5 +-
 .../ha/codedistributor/HDFSCodeDistributor.java |  101 ++
 .../apache/storm/hdfs/trident/HdfsState.java    |  392 ++++--
 .../trident/rotation/FileRotationPolicy.java    |   14 +
 .../rotation/FileSizeRotationPolicy.java        |   13 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |   10 +
 .../trident/rotation/TimedRotationPolicy.java   |   31 +-
 .../storm/hdfs/trident/FixedBatchSpout.java     |    2 +-
 .../storm/hdfs/trident/HdfsStateTest.java       |  206 +++
 external/storm-hive/README.md                   |   18 +-
 external/storm-hive/pom.xml                     |   45 +
 .../org/apache/storm/hive/bolt/HiveBolt.java    |   77 +-
 .../apache/storm/hive/common/HiveOptions.java   |   11 +
 .../apache/storm/hive/common/HiveWriter.java    |    5 +-
 .../storm/hive/trident/HiveStateFactory.java    |   17 +
 .../apache/storm/hive/trident/HiveUpdater.java  |   17 +
 .../apache/storm/hive/bolt/TestHiveBolt.java    |  112 +-
 .../storm/hive/trident/TridentHiveTopology.java |    2 +-
 external/storm-jdbc/LICENSE                     |  202 ---
 external/storm-jdbc/README.md                   |   76 +-
 external/storm-jdbc/pom.xml                     |    4 +
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |   17 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |    5 +-
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |    5 +-
 .../org/apache/storm/jdbc/common/Column.java    |    7 +-
 .../storm/jdbc/common/ConnectionProvider.java   |   43 +
 .../jdbc/common/HikariCPConnectionProvider.java |   63 +
 .../apache/storm/jdbc/common/JdbcClient.java    |   19 +-
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |   17 +
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |   17 +
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |    6 +-
 .../storm/jdbc/trident/state/JdbcState.java     |   13 +-
 .../storm/jdbc/common/JdbcClientTest.java       |    5 +-
 .../jdbc/topology/AbstractUserTopology.java     |   17 +-
 .../jdbc/topology/UserPersistanceTopology.java  |   18 +-
 .../UserPersistanceTridentTopology.java         |    2 +-
 external/storm-jdbc/src/test/sql/test.sql       |   17 +
 external/storm-kafka/CHANGELOG.md               |   13 -
 external/storm-kafka/README.md                  |   57 +-
 external/storm-kafka/pom.xml                    |   18 +-
 .../ExponentialBackoffMsgRetryManager.java      |   25 +-
 .../jvm/storm/kafka/FailedMsgRetryManager.java  |    3 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |   17 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   23 +-
 .../src/jvm/storm/kafka/SpoutConfig.java        |    3 +
 .../src/jvm/storm/kafka/ZkCoordinator.java      |    2 +-
 .../src/jvm/storm/kafka/ZkState.java            |    2 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |   92 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |    4 +-
 .../trident/TransactionalTridentKafkaSpout.java |    4 +-
 .../ExponentialBackoffMsgRetryManagerTest.java  |   43 +-
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   32 +-
 .../src/test/storm/kafka/ZkCoordinatorTest.java |    1 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  106 +-
 external/storm-redis/LICENSE                    |  202 ---
 external/storm-redis/README.md                  |   21 +-
 .../storm/redis/bolt/AbstractRedisBolt.java     |   42 +
 .../storm/redis/bolt/RedisLookupBolt.java       |   23 +-
 .../apache/storm/redis/bolt/RedisStoreBolt.java |   21 +
 .../redis/common/config/JedisClusterConfig.java |   55 +-
 .../redis/common/config/JedisPoolConfig.java    |   66 +
 .../common/container/JedisClusterContainer.java |   18 +
 .../JedisCommandsContainerBuilder.java          |   15 +
 .../JedisCommandsInstanceContainer.java         |   12 +
 .../redis/common/container/JedisContainer.java  |   16 +
 .../common/mapper/RedisDataTypeDescription.java |   20 +
 .../redis/common/mapper/RedisLookupMapper.java  |    7 +-
 .../storm/redis/common/mapper/RedisMapper.java  |    9 +-
 .../redis/common/mapper/RedisStoreMapper.java   |    3 +
 .../storm/redis/common/mapper/TupleMapper.java  |   18 +-
 .../trident/state/AbstractRedisMapState.java    |  137 ++
 .../state/AbstractRedisStateQuerier.java        |   95 ++
 .../state/AbstractRedisStateUpdater.java        |   96 ++
 .../storm/redis/trident/state/KeyFactory.java   |   57 +
 .../storm/redis/trident/state/Options.java      |   40 +
 .../trident/state/RedisClusterMapState.java     |  284 ++--
 .../redis/trident/state/RedisClusterState.java  |   44 +-
 .../trident/state/RedisClusterStateQuerier.java |   75 +-
 .../trident/state/RedisClusterStateUpdater.java |   89 +-
 .../redis/trident/state/RedisMapState.java      |  333 +++--
 .../storm/redis/trident/state/RedisState.java   |   44 +-
 .../redis/trident/state/RedisStateQuerier.java  |   71 +-
 .../redis/trident/state/RedisStateUpdater.java  |   93 +-
 .../redis/trident/WordCountLookupMapper.java    |   57 +
 .../redis/trident/WordCountStoreMapper.java     |   39 +
 .../redis/trident/WordCountTridentRedis.java    |   12 +-
 .../trident/WordCountTridentRedisCluster.java   |   11 +-
 .../WordCountTridentRedisClusterMap.java        |    6 +-
 .../redis/trident/WordCountTridentRedisMap.java |    8 +-
 .../redis/trident/WordCountTupleMapper.java     |   16 -
 external/storm-solr/README.md                   |  201 +++
 external/storm-solr/pom.xml                     |   98 ++
 .../apache/storm/solr/bolt/SolrUpdateBolt.java  |  136 ++
 .../storm/solr/config/CountBasedCommit.java     |   59 +
 .../storm/solr/config/SolrCommitStrategy.java   |   30 +
 .../apache/storm/solr/config/SolrConfig.java    |   42 +
 .../storm/solr/mapper/SolrFieldsMapper.java     |  182 +++
 .../storm/solr/mapper/SolrJsonMapper.java       |  160 +++
 .../apache/storm/solr/mapper/SolrMapper.java    |   32 +
 .../storm/solr/mapper/SolrMapperException.java  |   24 +
 .../org/apache/storm/solr/schema/CopyField.java |   50 +
 .../org/apache/storm/solr/schema/Field.java     |   50 +
 .../org/apache/storm/solr/schema/FieldType.java |   63 +
 .../org/apache/storm/solr/schema/Schema.java    |  116 ++
 .../storm/solr/schema/SolrFieldTypeFinder.java  |  182 +++
 .../schema/builder/RestJsonSchemaBuilder.java   |   69 +
 .../solr/schema/builder/SchemaBuilder.java      |   27 +
 .../apache/storm/solr/trident/SolrState.java    |   67 +
 .../storm/solr/trident/SolrStateFactory.java    |   44 +
 .../apache/storm/solr/trident/SolrUpdater.java  |   33 +
 .../storm/solr/spout/SolrFieldsSpout.java       |   76 +
 .../apache/storm/solr/spout/SolrJsonSpout.java  |  120 ++
 .../storm/solr/topology/SolrFieldsTopology.java |   56 +
 .../storm/solr/topology/SolrJsonTopology.java   |   48 +
 .../storm/solr/topology/SolrTopology.java       |   82 ++
 .../solr/trident/SolrFieldsTridentTopology.java |   45 +
 .../solr/trident/SolrJsonTridentTopology.java   |   45 +
 .../org/apache/storm/solr/util/TestUtil.java    |   30 +
 log4j2/cluster.xml                              |   76 +
 log4j2/worker.xml                               |   77 ++
 logback/cluster.xml                             |   85 --
 logback/worker.xml                              |   41 -
 pom.xml                                         |   99 +-
 storm-core/pom.xml                              |  334 +++--
 storm-core/src/clj/backtype/storm/cluster.clj   |  152 +-
 .../clj/backtype/storm/command/get_errors.clj   |   52 +
 .../backtype/storm/command/shell_submission.clj |    9 +-
 storm-core/src/clj/backtype/storm/config.clj    |   30 +-
 storm-core/src/clj/backtype/storm/converter.clj |   49 +-
 .../backtype/storm/daemon/builtin_metrics.clj   |   25 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   47 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  182 ++-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   63 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  382 +++--
 .../clj/backtype/storm/daemon/supervisor.clj    |   85 +-
 .../src/clj/backtype/storm/daemon/task.clj      |    1 -
 .../src/clj/backtype/storm/daemon/worker.clj    |  141 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   23 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   13 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |   23 +-
 storm-core/src/clj/backtype/storm/timer.clj     |    7 +-
 storm-core/src/clj/backtype/storm/tuple.clj     |    6 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  280 ++--
 .../src/clj/backtype/storm/ui/helpers.clj       |   12 +-
 storm-core/src/clj/backtype/storm/util.clj      |   27 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |  134 +-
 .../src/dev/drpc-simple-acl-test-scenario.yaml  |   17 +
 storm-core/src/jvm/backtype/storm/Config.java   |  224 ++-
 .../jvm/backtype/storm/ConfigValidation.java    |   46 +-
 .../src/jvm/backtype/storm/LogWriter.java       |   83 ++
 .../storm/codedistributor/ICodeDistributor.java |   56 +
 .../LocalFileSystemCodeDistributor.java         |  106 ++
 .../coordination/BatchSubtopologyBuilder.java   |    6 +-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |    6 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |    9 +-
 .../backtype/storm/generated/Assignment.java    |  194 +--
 .../jvm/backtype/storm/generated/BoltStats.java |  342 ++---
 .../storm/generated/ClusterSummary.java         |  292 ++--
 .../storm/generated/ClusterWorkerHeartbeat.java |   54 +-
 .../storm/generated/ComponentPageInfo.java      |  652 +++++++--
 .../backtype/storm/generated/Credentials.java   |   46 +-
 .../backtype/storm/generated/DebugOptions.java  |  506 +++++++
 .../backtype/storm/generated/ExecutorStats.java |  162 +--
 .../storm/generated/LSApprovedWorkers.java      |   46 +-
 .../generated/LSSupervisorAssignments.java      |   50 +-
 .../storm/generated/LSWorkerHeartbeat.java      |   38 +-
 .../storm/generated/LocalAssignment.java        |   38 +-
 .../storm/generated/LocalStateData.java         |   50 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 1305 +++++++++++++++++-
 .../backtype/storm/generated/NimbusSummary.java |  796 +++++++++++
 .../jvm/backtype/storm/generated/NodeInfo.java  |   34 +-
 .../storm/generated/RebalanceOptions.java       |   46 +-
 .../backtype/storm/generated/SpoutStats.java    |  226 +--
 .../jvm/backtype/storm/generated/StormBase.java |  225 ++-
 .../storm/generated/SupervisorInfo.java         |  110 +-
 .../backtype/storm/generated/TopologyInfo.java  |  398 +++++-
 .../storm/generated/TopologyPageInfo.java       |  316 ++++-
 .../backtype/storm/generated/TopologyStats.java |  222 +--
 .../storm/generated/TopologySummary.java        |  109 +-
 .../storm/grouping/PartialKeyGrouping.java      |   27 +-
 .../storm/messaging/ConnectionWithStatus.java   |   17 +
 .../backtype/storm/messaging/netty/Client.java  |  476 +++----
 .../backtype/storm/messaging/netty/Context.java |   20 +-
 .../storm/messaging/netty/MessageBatch.java     |   24 +-
 .../storm/messaging/netty/MessageBuffer.java    |   58 +
 .../messaging/netty/SaslStormClientHandler.java |    4 +-
 .../messaging/netty/StormClientHandler.java     |   46 +
 .../netty/StormClientPipelineFactory.java       |    4 +-
 .../backtype/storm/metric/EventLoggerBolt.java  |   58 +
 .../storm/metric/FileBasedEventLogger.java      |  105 ++
 .../jvm/backtype/storm/metric/IEventLogger.java |   59 +
 .../storm/metric/MetricsConsumerBolt.java       |    2 +-
 .../jvm/backtype/storm/metric/SystemBolt.java   |   15 +
 .../backtype/storm/nimbus/ILeaderElector.java   |   60 +
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |   93 ++
 .../security/auth/SaslTransportPlugin.java      |    8 +-
 .../security/auth/ShellBasedGroupsMapping.java  |    2 +-
 .../auth/authorizer/DRPCAuthorizerBase.java     |   17 +
 .../authorizer/DRPCSimpleACLAuthorizer.java     |   20 +-
 .../authorizer/ImpersonationAuthorizer.java     |   19 +-
 .../kerberos/KerberosSaslTransportPlugin.java   |    5 +-
 .../security/auth/kerberos/NoOpTTrasport.java   |   40 +
 .../auth/kerberos/jaas_kerberos_cluster.conf    |   20 +-
 .../auth/kerberos/jaas_kerberos_launcher.conf   |   19 +
 .../serialization/BlowfishTupleSerializer.java  |    5 +-
 .../storm/spout/ISpoutOutputCollector.java      |    6 +-
 .../jvm/backtype/storm/spout/ShellSpout.java    |   10 +
 .../storm/spout/SpoutOutputCollector.java       |    5 +
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   17 +-
 .../backtype/storm/task/TopologyContext.java    |   61 +-
 .../testing/OpaqueMemoryTransactionalSpout.java |    2 +-
 .../backtype/storm/testing/SpoutTracker.java    |    6 +
 .../storm/testing/TestWordBytesCounter.java     |   27 +
 .../backtype/storm/testing/TestWordCounter.java |    6 +-
 .../topology/BaseConfigurationDeclarer.java     |    2 +-
 .../ComponentConfigurationDeclarer.java         |    2 +-
 .../storm/topology/IBasicOutputCollector.java   |    5 +-
 .../storm/topology/TopologyBuilder.java         |   34 +-
 .../TransactionalTopologyBuilder.java           |   12 +-
 .../src/jvm/backtype/storm/tuple/Fields.java    |   21 +-
 .../src/jvm/backtype/storm/tuple/ITuple.java    |  126 +-
 .../src/jvm/backtype/storm/tuple/Tuple.java     |    3 +-
 .../utils/DisruptorBackpressureCallback.java    |   27 +
 .../backtype/storm/utils/DisruptorQueue.java    |  239 +++-
 .../jvm/backtype/storm/utils/NimbusClient.java  |   78 +-
 .../jvm/backtype/storm/utils/RateTracker.java   |  119 ++
 .../jvm/backtype/storm/utils/RotatingMap.java   |   12 +-
 .../jvm/backtype/storm/utils/ShellProcess.java  |   29 +-
 .../StormBoundedExponentialBackoffRetry.java    |    4 +-
 .../jvm/backtype/storm/utils/TimeCacheMap.java  |   60 +-
 .../backtype/storm/utils/TransferDrainer.java   |   62 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   67 +-
 .../storm/utils/WorkerBackpressureCallback.java |   26 +
 .../storm/utils/WorkerBackpressureThread.java   |   59 +
 .../src/jvm/storm/trident/TridentTopology.java  |   13 +-
 .../storm/trident/planner/SubtopologyBolt.java  |    2 +-
 .../storm/trident/spout/BatchSpoutExecutor.java |    2 +-
 .../jvm/storm/trident/spout/IBatchSpout.java    |    2 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |    2 +-
 .../trident/spout/IPartitionedTridentSpout.java |    2 +-
 .../jvm/storm/trident/spout/ITridentSpout.java  |    6 +-
 .../OpaquePartitionedTridentSpoutExecutor.java  |   20 +-
 .../spout/PartitionedTridentSpoutExecutor.java  |   26 +-
 .../trident/spout/RichSpoutBatchExecutor.java   |   15 +-
 .../trident/spout/RichSpoutBatchTriggerer.java  |   18 +-
 .../trident/spout/TridentSpoutCoordinator.java  |    6 +-
 .../trident/spout/TridentSpoutExecutor.java     |   10 +-
 .../storm/trident/testing/FeederBatchSpout.java |    4 +-
 .../testing/FeederCommitterBatchSpout.java      |    2 +-
 .../storm/trident/testing/FixedBatchSpout.java  |    2 +-
 .../trident/topology/TridentBoltExecutor.java   |   17 +-
 .../topology/TridentTopologyBuilder.java        |   22 +-
 .../worker-launcher/.deps/worker-launcher.Po    |   16 +
 storm-core/src/py/storm/Nimbus-remote           |    7 +
 storm-core/src/py/storm/Nimbus.py               |  262 ++++
 storm-core/src/py/storm/ttypes.py               | 1268 +++++++++++------
 storm-core/src/storm.thrift                     |   32 +-
 storm-core/src/ui/public/component.html         |    8 +
 storm-core/src/ui/public/index.html             |   21 +
 storm-core/src/ui/public/js/script.js           |   80 +-
 .../templates/component-page-template.html      |   19 +-
 .../public/templates/index-page-template.html   |   59 +-
 .../templates/topology-page-template.html       |   26 +-
 storm-core/src/ui/public/topology.html          |   38 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   27 +-
 .../test/clj/backtype/storm/config_test.clj     |   65 +-
 .../test/clj/backtype/storm/grouping_test.clj   |   43 +-
 .../test/clj/backtype/storm/logviewer_test.clj  |    8 +-
 .../storm/messaging/netty_unit_test.clj         |    2 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |  309 +++--
 .../auth/DefaultHttpCredentialsPlugin_test.clj  |   15 +
 .../backtype/storm/security/auth/auth_test.clj  |    4 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   15 +
 .../storm/security/auth/drpc-auth-alice.jaas    |   17 +
 .../storm/security/auth/drpc-auth-bob.jaas      |   17 +
 .../storm/security/auth/drpc-auth-charlie.jaas  |   17 +
 .../storm/security/auth/drpc-auth-server.jaas   |   17 +
 .../storm/security/auth/drpc_auth_test.clj      |    6 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   14 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   33 +-
 .../test/clj/backtype/storm/utils_test.clj      |   12 -
 .../storm/topology/TopologyBuilderTest.java     |   48 +
 .../utils/DisruptorQueueBackpressureTest.java   |  115 ++
 .../storm/utils/DisruptorQueueTest.java         |    2 +-
 .../backtype/storm/utils/RateTrackerTest.java   |   62 +
 storm-core/test/resources/log4j2-test.xml       |   32 +
 storm-core/test/resources/logback-test.xml      |   26 -
 storm-core/test/resources/test_runner.clj       |  114 ++
 storm-dist/binary/LICENSE                       |  224 ++-
 storm-dist/binary/NOTICE                        |    4 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  132 +-
 484 files changed, 28370 insertions(+), 6657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/common.clj
index e3a10ef,9da0e4f..a30850d
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@@ -273,6 -293,26 +293,26 @@@
       (metrics-consumer-register-ids storm-conf)
       (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
  
+ ;; return the fields that event logger bolt expects
+ (defn eventlogger-bolt-fields []
+   [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID)  (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
+   )
+ 
+ (defn add-eventlogger! [storm-conf ^StormTopology ret]
+   (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
+         eventlogger-bolt (thrift/mk-bolt-spec* (eventlogger-inputs ret)
+                      (EventLoggerBolt.)
+                      {}
+                      :p num-executors
+                      :conf {TOPOLOGY-TASKS num-executors
+                             TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
+ 
+     (doseq [[_ component] (all-components ret)
+             :let [common (.get_common component)]]
+       (.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields))))
 -    (.put_to_bolts ret "__eventlogger" eventlogger-bolt)
++    (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
+     ))
+ 
  (defn add-metric-components! [storm-conf ^StormTopology topology]  
    (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
      (.put_to_bolts topology comp-id bolt-spec)))

http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 9e880bb,7ea515b..789190c
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -32,11 -35,10 +35,14 @@@
    (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
              ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
              KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
 -            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
 +            ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
 +            ComponentPageInfo TopologyPageInfo])
    (:import [backtype.storm.daemon Shutdownable])
-   (:use [backtype.storm util config log timer])
-   (:require [backtype.storm [cluster :as cluster] [stats :as stats]])
+   (:use [backtype.storm util config log timer zookeeper])
 -  (:require [backtype.storm [cluster :as cluster] [stats :as stats] [converter :as converter]])
++  (:require [backtype.storm [cluster :as cluster]
++                            [converter :as converter]
++                            [stats :as stats]
++                            [tuple :as tuple]])
    (:require [clojure.set :as set])
    (:import [backtype.storm.daemon.common StormBase Assignment])
    (:use [backtype.storm.daemon common])
@@@ -1001,46 -1071,27 +1068,62 @@@
    (.prepare inimbus conf (master-inimbus-dir conf))
    (log-message "Starting Nimbus with conf " conf)
    (let [nimbus (nimbus-data conf inimbus)
 -       principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
 +        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
 +        get-common-topo-info
 +          (fn [^String storm-id operation]
 +            (let [storm-cluster-state (:storm-cluster-state nimbus)
 +                  topology-conf (try-read-storm-conf conf storm-id)
 +                  storm-name (topology-conf TOPOLOGY-NAME)
 +                  _ (check-authorization! nimbus
 +                                          storm-name
 +                                          topology-conf
 +                                          operation)
 +                  topology (try-read-storm-topology conf storm-id)
 +                  task->component (storm-task-info topology topology-conf)
 +                  base (.storm-base storm-cluster-state storm-id nil)
 +                  launch-time-secs (if base (:launch-time-secs base)
 +                                     (throw
 +                                       (NotAliveException. (str storm-id))))
 +                  assignment (.assignment-info storm-cluster-state storm-id nil)
 +                  beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
 +                                                 storm-id))
 +                  all-components (set (vals task->component))]
 +              {:storm-name storm-name
 +               :storm-cluster-state storm-cluster-state
 +               :all-components all-components
 +               :launch-time-secs launch-time-secs
 +               :assignment assignment
 +               :beats beats
 +               :topology topology
 +               :task->component task->component
 +               :base base}))
 +        get-last-error (fn [storm-cluster-state storm-id component-id]
 +                         (if-let [e (.last-error storm-cluster-state
 +                                                 storm-id
 +                                                 component-id)]
 +                           (doto (ErrorInfo. (:error e) (:time-secs e))
 +                             (.set_host (:host e))
 +                             (.set_port (:port e)))))]
      (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
+ 
+     ;add to nimbuses
+     (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus))
+       (NimbusSummary.
+         (.getHost (:nimbus-host-port-info nimbus))
+         (.getPort (:nimbus-host-port-info nimbus))
+         (current-time-secs)
+         false ;is-leader
+         (str (VersionInfo/getVersion))))
+ 
+     (.addToLeaderLockQueue (:leader-elector nimbus))
      (cleanup-corrupt-topologies! nimbus)
-     (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
-       (transition! nimbus storm-id :startup))
+     (setup-code-distributor nimbus)
+ 
+     ;register call back for code-distributor
+     (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus)))
+     (when (is-leader nimbus :throw-exception false)
+       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+         (transition! nimbus storm-id :startup)))
      (schedule-recurring (:timer nimbus)
                          0
                          (conf NIMBUS-MONITOR-FREQ-SECS)
@@@ -1364,63 -1465,11 +1495,94 @@@
              topo-info
            ))
  
--      (^TopologyInfo getTopologyInfo [this ^String storm-id]
++      (^TopologyInfo getTopologyInfo [this ^String topology-id]
          (.getTopologyInfoWithOpts this
--                                  storm-id
++                                  topology-id
                                    (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
  
 +      (^TopologyPageInfo getTopologyPageInfo
-         [this ^String storm-id ^String window ^boolean include-sys?]
-         (let [info (get-common-topo-info storm-id "getTopologyPageInfo")
++        [this ^String topo-id ^String window ^boolean include-sys?]
++        (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
 +
 +              exec->node+port (:executor->node+port (:assignment info))
 +              last-err-fn (partial get-last-error
 +                                   (:storm-cluster-state info)
-                                    storm-id)
-               topo-page-info (stats/agg-topo-execs-stats storm-id
++                                   topo-id)
++              topo-page-info (stats/agg-topo-execs-stats topo-id
 +                                                         exec->node+port
 +                                                         (:task->component info)
 +                                                         (:beats info)
 +                                                         (:topology info)
 +                                                         window
 +                                                         include-sys?
 +                                                         last-err-fn)]
 +          (when-let [owner (:owner (:base info))]
 +            (.set_owner topo-page-info owner))
-           (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)]
++          (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
 +            (.set_sched_status topo-page-info sched-status))
 +          (doto topo-page-info
 +            (.set_name (:storm-name info))
 +            (.set_status (extract-status-str (:base info)))
 +            (.set_uptime_secs (time-delta (:launch-time-secs info)))
 +            (.set_topology_conf (to-json (try-read-storm-conf conf
-                                                               storm-id))))))
++                                                              topo-id)))
++            (.set_replication_count
++              (.getReplicationCount (:code-distributor nimbus) topo-id)))
++          (when-let [debug-options
++                     (get-in info [:base :component->debug topo-id])]
++            (.set_debug_options
++              topo-page-info
++              (converter/thriftify-debugoptions debug-options)))
++          topo-page-info))
 +
 +      (^ComponentPageInfo getComponentPageInfo
 +        [this
-          ^String topology-id
++         ^String topo-id
 +         ^String component-id
 +         ^String window
 +         ^boolean include-sys?]
-         (let [info (get-common-topo-info topology-id "getComponentPageInfo")
++        (let [info (get-common-topo-info topo-id "getComponentPageInfo")
 +              {:keys [executor->node+port node->host]} (:assignment info)
 +              executor->host+port (map-val (fn [[node port]]
 +                                             [(node->host node) port])
 +                                           executor->node+port)
-               ret (stats/agg-comp-execs-stats executor->host+port
-                                               (:task->component info)
-                                               (:beats info)
-                                               window
-                                               include-sys?
-                                               topology-id
-                                               (:topology info)
-                                               component-id)]
-           (doto ret
++              comp-page-info (stats/agg-comp-execs-stats executor->host+port
++                                                         (:task->component info)
++                                                         (:beats info)
++                                                         window
++                                                         include-sys?
++                                                         topo-id
++                                                         (:topology info)
++                                                         component-id)]
++          (doto comp-page-info
 +            (.set_topology_name (:storm-name info))
 +            (.set_errors (get-errors (:storm-cluster-state info)
-                                      topology-id
-                                      component-id)))))
++                                     topo-id
++                                     component-id))
++            (.set_topology_status (extract-status-str (:base info))))
++          (when-let [debug-options
++                     (get-in info [:base :component->debug component-id])]
++            (.set_debug_options
++              comp-page-info
++              (converter/thriftify-debugoptions debug-options)))
++          ;; Add the event logger details.
++          (let [component->tasks (reverse-map (:task->component info))
++                eventlogger-tasks (sort (get component->tasks
++                                             EVENTLOGGER-COMPONENT-ID))
++                ;; Find the task the events from this component route to.
++                task-index (mod (tuple/list-hash-code [component-id])
++                                (count eventlogger-tasks))
++                task-id (nth eventlogger-tasks task-index)
++                eventlogger-exec (first (filter (fn [[start stop]]
++                                                  (between? task-id start stop))
++                                                (keys executor->host+port)))
++                [host port] (get executor->host+port eventlogger-exec)]
++            (if (and host port)
++              (doto comp-page-info
++                (.set_eventlog_host host)
++                (.set_eventlog_port port))))
++          comp-page-info))
 +
        Shutdownable
        (shutdown [this]
          (log-message "Shutting down master")

http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 0384316,414bfb1..4ae731d
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -21,20 -21,18 +21,21 @@@
          ring.middleware.multipart-params)
    (:use [ring.middleware.json :only [wrap-json-params]])
    (:use [hiccup core page-helpers])
-   (:use [backtype.storm config util log stats])
 -  (:use [backtype.storm config util log tuple zookeeper])
++  (:use [backtype.storm config util log stats tuple zookeeper])
    (:use [backtype.storm.ui helpers])
    (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                                ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
    (:use [clojure.string :only [blank? lower-case trim]])
-   (:import [backtype.storm.utils Utils])
+   (:import [backtype.storm.utils Utils]
+            [backtype.storm.generated NimbusSummary])
    (:import [backtype.storm.generated ExecutorSpecificStats
-             ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
+             ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
              ErrorInfo ClusterSummary SupervisorSummary TopologySummary
              Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
-             KillOptions GetInfoOptions NumErrorsChoice TopologyPageInfo
 -            KillOptions GetInfoOptions NumErrorsChoice DebugOptions])
++            KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo
 +            TopologyStats CommonAggregateStats ComponentAggregateStats
 +            ComponentType BoltAggregateStats SpoutAggregateStats
 +            ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo])
    (:import [backtype.storm.security.auth AuthUtils ReqContext])
    (:import [backtype.storm.generated AuthorizationException])
    (:import [backtype.storm.security.auth AuthUtils])
@@@ -115,11 -263,102 +108,28 @@@
    (let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
      (into (sorted-map) ret )))
  
- (defn worker-log-link [host port topology-id]
 -(defn error-subset
 -  [error-str]
 -  (apply str (take 200 error-str)))
 -
 -(defn most-recent-error
 -  [errors-list]
 -  (let [error (->> errors-list
 -                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
 -                   reverse
 -                   first)]
 -     error))
 -
 -(defn component-task-summs
 -  [^TopologyInfo summ topology id]
 -  (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
 -        bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
 -        spout-comp-summs (group-by-comp spout-summs)
 -        bolt-comp-summs (group-by-comp bolt-summs)
 -        ret (if (contains? spout-comp-summs id)
 -              (spout-comp-summs id)
 -              (bolt-comp-summs id))]
 -    (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
 -
+ (defn logviewer-link [host fname secure?]
+   (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
+     (url-format "https://%s:%s/log?file=%s"
+       host
+       (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
+       fname)
+     (url-format "http://%s:%s/log?file=%s"
+       host
+       (*STORM-CONF* LOGVIEWER-PORT)
+       fname))
+   )
+ 
 -(defn executor-has-task-id? [task-id executor-info]
 -  (between? task-id (.get_task_start executor-info) (.get_task_end executor-info)))
 -
 -(defn get-host-port [task-id executor-summs]
 -  (let [ex-sum (some #(if (executor-has-task-id? task-id (.get_executor_info %)) %) executor-summs)]
 -    {:host (.get_host ex-sum) :port (.get_port ex-sum)}))
 -
 -(defn get-sorted-eventlogger-task-ids [executor-summs]
 -  (let [executor-infos (map #(.get_executor_info %) executor-summs)]
 -  (sort (flatten (map #(range (.get_task_start %) (inc (.get_task_end %))) executor-infos)))))
 -
 -(defn get-eventlogger-executor-summs [^TopologyInfo topology-info topology]
 -  (let [bolt-summs (filter (partial bolt-summary? topology) (.get_executors topology-info))]
 -        ((group-by-comp bolt-summs) "__eventlogger")))
 -
 -;
 -; The eventlogger uses fields grouping on the component-id so that events from same component
 -; always goes to the same event logger task. Here we use the same fields grouping
 -; to find the correct eventlogger task.
 -(defn get-mapped-task-id [sorted-task-ids ^String component-id]
 -  (nth sorted-task-ids (mod (list-hash-code [component-id]) (count sorted-task-ids))))
 -
+ (defn event-log-link
 -  [topology-id ^TopologyInfo topology-info topology component-id secure?]
 -  (let [executor-summs (get-eventlogger-executor-summs topology-info topology)
 -        sorted-task-ids (get-sorted-eventlogger-task-ids executor-summs)
 -        mapped-task-id (get-mapped-task-id sorted-task-ids component-id)
 -        host-port (get-host-port mapped-task-id executor-summs)
 -        fname (event-logs-filename topology-id (host-port :port))]
 -    (logviewer-link (host-port :host) fname secure?)))
++  [topology-id component-id host port secure?]
++  (logviewer-link host (event-logs-filename topology-id port) secure?))
+ 
+ (defn worker-log-link [host port topology-id secure?]
    (let [fname (logs-filename topology-id port)]
-     (url-format (str "http://%s:%s/log?file=%s")
-           host (*STORM-CONF* LOGVIEWER-PORT) fname)))
+     (logviewer-link host fname secure?)))
  
+ (defn nimbus-log-link [host port]
+   (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
 -
 -(defn compute-executor-capacity
 -  [^ExecutorSummary e]
 -  (let [stats (.get_stats e)
 -        stats (if stats
 -                (-> stats
 -                    (aggregate-bolt-stats true)
 -                    (aggregate-bolt-streams)
 -                    swap-map-order
 -                    (get "600")))
 -        uptime (nil-to-zero (.get_uptime_secs e))
 -        window (if (< uptime 600) uptime 600)
 -        executed (-> stats :executed nil-to-zero)
 -        latency (-> stats :execute-latencies nil-to-zero)]
 -    (if (> window 0)
 -      (div (* executed latency) (* 1000 window)))))
 -
 -(defn compute-bolt-capacity
 -  [executors]
 -  (->> executors
 -       (map compute-executor-capacity)
 -       (map nil-to-zero)
 -       (apply max)))
 -
  (defn get-error-time
    [error]
    (if error
@@@ -132,9 -371,10 +142,10 @@@
      ""))
  
  (defn get-error-port
 -  [error error-host top-id]
 +  [error]
    (if error
-     (.get_port ^ErrorInfo error)))
+     (.get_port ^ErrorInfo error)
+     ""))
  
  (defn get-error-host
    [error]
@@@ -142,6 -382,41 +153,12 @@@
      (.get_host ^ErrorInfo error)
      ""))
  
 -(defn spout-streams-stats
 -  [summs include-sys?]
 -  (let [stats-seq (get-filled-stats summs)]
 -    (aggregate-spout-streams
 -      (aggregate-spout-stats
 -        stats-seq include-sys?))))
 -
 -(defn bolt-streams-stats
 -  [summs include-sys?]
 -  (let [stats-seq (get-filled-stats summs)]
 -    (aggregate-bolt-streams
 -      (aggregate-bolt-stats
 -        stats-seq include-sys?))))
 -
 -(defn total-aggregate-stats
 -  [spout-summs bolt-summs include-sys?]
 -  (let [spout-stats (get-filled-stats spout-summs)
 -        bolt-stats (get-filled-stats bolt-summs)
 -        agg-spout-stats (-> spout-stats
 -                            (aggregate-spout-stats include-sys?)
 -                            aggregate-spout-streams)
 -        agg-bolt-stats (-> bolt-stats
 -                           (aggregate-bolt-stats include-sys?)
 -                           aggregate-bolt-streams)]
 -    (merge-with
 -      (fn [s1 s2]
 -        (merge-with + s1 s2))
 -      (select-keys
 -        agg-bolt-stats
 -        ;; Include only keys that will be used.  We want to count acked and
 -        ;; failed only for the "tuple trees," so we do not include those keys
 -        ;; from the bolt executors.
 -        [:emitted :transferred])
 -      agg-spout-stats)))
++(defn get-error-time
++  [error]
++  (if error
++    (.get_error_time_secs ^ErrorInfo error)
++    ""))
+ 
  (defn stats-times
    [stats-map]
    (sort-by #(Integer/parseInt %)
@@@ -353,145 -671,185 +401,154 @@@
         "tasksTotal" (.get_num_tasks t)
         "workersTotal" (.get_num_workers t)
         "executorsTotal" (.get_num_executors t)
+        "replicationCount" (.get_replication_count t)
         "schedulerInfo" (.get_sched_status t)})}))
  
 -(defn topology-stats [id window stats]
 +(defn topology-stats [window stats]
    (let [times (stats-times (:emitted stats))
          display-map (into {} (for [t times] [t pretty-uptime-sec]))
          display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 -    (for [k (concat times [":all-time"])
 -          :let [disp ((display-map k) k)]]
 +    (for [w (concat times [":all-time"])
 +          :let [disp ((display-map w) w)]]
        {"windowPretty" disp
 -       "window" k
 -       "emitted" (get-in stats [:emitted k])
 -       "transferred" (get-in stats [:transferred k])
 -       "completeLatency" (float-str (get-in stats [:complete-latencies k]))
 -       "acked" (get-in stats [:acked k])
 -       "failed" (get-in stats [:failed k])})))
 -
 -(defn spout-comp [top-id summ-map errors window include-sys? secure?]
 -  (for [[id summs] summ-map
 -        :let [stats-seq (get-filled-stats summs)
 -              stats (aggregate-spout-streams
 -                     (aggregate-spout-stats
 -                      stats-seq include-sys?))
 -              last-error (most-recent-error (get errors id))
 -              error-host (get-error-host last-error)
 -              error-port (get-error-port last-error error-host top-id)]]
 -    {"spoutId" id
 -     "encodedSpoutId" (url-encode id)
 -     "executors" (count summs)
 -     "tasks" (sum-tasks summs)
 -     "emitted" (get-in stats [:emitted window])
 -     "transferred" (get-in stats [:transferred window])
 -     "completeLatency" (float-str (get-in stats [:complete-latencies window]))
 -     "acked" (get-in stats [:acked window])
 -     "failed" (get-in stats [:failed window])
 -     "errorHost" error-host
 -     "errorPort" error-port
 -     "errorWorkerLogLink" (worker-log-link error-host error-port top-id secure?)
 -     "errorLapsedSecs" (get-error-time last-error)
 -     "lastError" (get-error-data last-error)
 -     "time" (if last-error (* 1000 (long (.get_error_time_secs ^ErrorInfo last-error))))}))
 -
 -(defn bolt-comp [top-id summ-map errors window include-sys? secure?]
 -  (for [[id summs] summ-map
 -        :let [stats-seq (get-filled-stats summs)
 -              stats (aggregate-bolt-streams
 -                     (aggregate-bolt-stats
 -                      stats-seq include-sys?))
 -              last-error (most-recent-error (get errors id))
 -              error-host (get-error-host last-error)
 -              error-port (get-error-port last-error error-host top-id)]]
 -    {"boltId" id
 -     "encodedBoltId" (url-encode id)
 -     "executors" (count summs)
 -     "tasks" (sum-tasks summs)
 -     "emitted" (get-in stats [:emitted window])
 -     "transferred" (get-in stats [:transferred window])
 -     "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
 -     "executeLatency" (float-str (get-in stats [:execute-latencies window]))
 -     "executed" (get-in stats [:executed window])
 -     "processLatency" (float-str (get-in stats [:process-latencies window]))
 -     "acked" (get-in stats [:acked window])
 -     "failed" (get-in stats [:failed window])
 -     "errorHost" error-host
 -     "errorPort" error-port
 -     "errorWorkerLogLink" (worker-log-link error-host error-port top-id secure?)
 -     "errorLapsedSecs" (get-error-time last-error)
 -     "lastError" (get-error-data last-error)
 -     "time" (if last-error (* 1000 (long (.get_error_time_secs ^ErrorInfo last-error))))}))
 -
 -(defn topology-summary [^TopologyInfo summ]
 -  (let [executors (.get_executors summ)
 -        workers (set (for [^ExecutorSummary e executors]
 -                       [(.get_host e) (.get_port e)]))
 -        topology-id (.get_id summ)
 -        debug-options (get (.get_component_debug summ) topology-id)]
 -      {"id" topology-id
 -       "encodedId" (url-encode (.get_id summ))
 -       "owner" (.get_owner summ)
 -       "name" (.get_name summ)
 -       "status" (.get_status summ)
 -       "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
 -       "tasksTotal" (sum-tasks executors)
 -       "workersTotal" (count workers)
 -       "executorsTotal" (count executors)
 -       "schedulerInfo" (.get_sched_status summ)
 -       "debug" (if (not-nil? debug-options) (.is_enable debug-options) false)
 -       "samplingPct" (if (not-nil? debug-options) (.get_samplingpct debug-options) 10)
 -       "replicationCount" (.get_replication_count summ)}))
 -
 -(defn spout-summary-json [topology-id id stats window]
 -  (let [times (stats-times (:emitted stats))
 -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
 -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 -     (for [k (concat times [":all-time"])
 -           :let [disp ((display-map k) k)]]
 -       {"windowPretty" disp
 -        "window" k
 -        "emitted" (get-in stats [:emitted k])
 -        "transferred" (get-in stats [:transferred k])
 -        "completeLatency" (float-str (get-in stats [:complete-latencies k]))
 -        "acked" (get-in stats [:acked k])
 -        "failed" (get-in stats [:failed k])})))
 +       "window" w
 +       "emitted" (get-in stats [:emitted w])
 +       "transferred" (get-in stats [:transferred w])
 +       "completeLatency" (float-str (get-in stats [:complete-latencies w]))
 +       "acked" (get-in stats [:acked w])
 +       "failed" (get-in stats [:failed w])})))
 +
 +(defn build-visualization [id window include-sys? user]
-   (with-nimbus nimbus
++  (thrift/with-configured-nimbus-connection nimbus
 +    (let [window (if window window ":all-time")
 +          topology-info (->> (doto
 +                               (GetInfoOptions.)
 +                               (.set_num_err_choice NumErrorsChoice/ONE))
 +                             (.getTopologyInfoWithOpts ^Nimbus$Client nimbus
 +                                                       id))
 +          storm-topology (.getTopology ^Nimbus$Client nimbus id)
 +          spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
 +          bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
 +          spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
 +          bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
 +          bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
 +          id->spout-spec (.get_spouts storm-topology)
 +          id->bolt (.get_bolts storm-topology)
 +          visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
 +                                                     (hashmap-to-persistent id->bolt))
 +                                              spout-comp-id->executor-summaries
 +                                              bolt-comp-id->executor-summaries
 +                                              window
 +                                              id)]
 +       {"visualizationTable" (stream-boxes visualizer-data)})))
 +
 +(defn- get-error-json
-   [topo-id error-info]
++  [topo-id error-info secure?]
 +  (let [host (get-error-host error-info)
 +        port (get-error-port error-info)]
 +    {"lastError" (get-error-data error-info)
++     "errorTime" (get-error-time error-info)
 +     "errorHost" host
 +     "errorPort" port
 +     "errorLapsedSecs" (get-error-time error-info)
-      "errorWorkerLogLink" (worker-log-link host port topo-id)}))
++     "errorWorkerLogLink" (worker-log-link host port topo-id secure?)}))
 +
 +(defn- common-agg-stats-json
 +  "Returns a JSON representation of a common aggregated statistics."
 +  [^CommonAggregateStats common-stats]
 +  {"executors" (.get_num_executors common-stats)
 +   "tasks" (.get_num_tasks common-stats)
 +   "emitted" (.get_emitted common-stats)
 +   "transferred" (.get_transferred common-stats)
 +   "acked" (.get_acked common-stats)
 +   "failed" (.get_failed common-stats)})
 +
 +(defmulti comp-agg-stats-json
 +  "Returns a JSON representation of aggregated statistics."
-   (fn [_ [id ^ComponentAggregateStats s]] (.get_type s)))
++  (fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s)))
 +
 +(defmethod comp-agg-stats-json ComponentType/SPOUT
-   [topo-id [id ^ComponentAggregateStats s]]
++  [topo-id secure? [id ^ComponentAggregateStats s]]
 +  (let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
 +        cs (.get_common_stats s)]
 +    (merge
 +      (common-agg-stats-json cs)
-       (get-error-json topo-id (.get_last_error s))
++      (get-error-json topo-id (.get_last_error s) secure?)
 +      {"spoutId" id
 +       "encodedSpoutId" (url-encode id)
 +       "completeLatency" (float-str (.get_complete_latency_ms ss))})))
 +
 +(defmethod comp-agg-stats-json ComponentType/BOLT
-   [topo-id [id ^ComponentAggregateStats s]]
++  [topo-id secure? [id ^ComponentAggregateStats s]]
 +  (let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
 +        cs (.get_common_stats s)]
 +    (merge
 +      (common-agg-stats-json cs)
-       (get-error-json topo-id (.get_last_error s))
++      (get-error-json topo-id (.get_last_error s) secure?)
 +      {"boltId" id
 +       "encodedBoltId" (url-encode id)
 +       "capacity" (float-str (.get_capacity ss))
 +       "executeLatency" (float-str (.get_execute_latency_ms ss))
 +       "executed" (.get_executed ss)
 +       "processLatency" (float-str (.get_process_latency_ms ss))})))
 +
 +(defn- unpack-topology-page-info
 +  "Unpacks the serialized object to data structures"
-   [^TopologyPageInfo topo-info window]
++  [^TopologyPageInfo topo-info window secure?]
 +  (let [id (.get_id topo-info)
 +        ^TopologyStats topo-stats (.get_topology_stats topo-info)
 +        stat->window->number
 +          {:emitted (.get_window_to_emitted topo-stats)
 +           :transferred (.get_window_to_transferred topo-stats)
 +           :complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
 +           :acked (.get_window_to_acked topo-stats)
 +           :failed (.get_window_to_failed topo-stats)}
-         topo-stats (topology-stats window stat->window->number)]
++        topo-stats (topology-stats window stat->window->number)
++        [debugEnabled
++         samplingPct] (if-let [debug-opts (.get_debug_options topo-info)]
++                        [(.is_enable debug-opts)
++                         (.get_samplingpct debug-opts)])]
 +    {"id" id
 +     "encodedId" (url-encode id)
 +     "owner" (.get_owner topo-info)
 +     "name" (.get_name topo-info)
 +     "status" (.get_status topo-info)
 +     "uptime" (pretty-uptime-sec (.get_uptime_secs topo-info))
 +     "tasksTotal" (.get_num_tasks topo-info)
 +     "workersTotal" (.get_num_workers topo-info)
 +     "executorsTotal" (.get_num_executors topo-info)
 +     "schedulerInfo" (.get_sched_status topo-info)
 +     "topologyStats" topo-stats
-      "spouts" (map (partial comp-agg-stats-json id)
++     "spouts" (map (partial comp-agg-stats-json id secure?)
 +                   (.get_id_to_spout_agg_stats topo-info))
-      "bolts" (map (partial comp-agg-stats-json id)
++     "bolts" (map (partial comp-agg-stats-json id secure?)
 +                  (.get_id_to_bolt_agg_stats topo-info))
-      "configuration" (.get_topology_conf topo-info)}))
++     "configuration" (.get_topology_conf topo-info)
++     "debug" (or debugEnabled false)
++     "samplingPct" (or samplingPct 10)
++     "replicationCount" (.get_replication_count topo-info)}))
  
- (defn topology-page [id window include-sys? user]
-   (with-nimbus nimbus
-     (let [window (or window ":all-time")
+ (defn topology-page [id window include-sys? user secure?]
+   (thrift/with-configured-nimbus-connection nimbus
+     (let [window (if window window ":all-time")
            window-hint (window-hint window)
 -          summ (->> (doto
 -                      (GetInfoOptions.)
 -                      (.set_num_err_choice NumErrorsChoice/ONE))
 -                    (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
 -          topology (.getTopology ^Nimbus$Client nimbus id)
 -          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))
 -          spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
 -          bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
 -          spout-comp-summs (group-by-comp spout-summs)
 -          bolt-comp-summs (group-by-comp bolt-summs)
 -          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
 -          name (.get_name summ)
 -          status (.get_status summ)
 -          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
 -          spouts (.get_spouts topology)
 -          bolts (.get_bolts topology)
 -          replication-count (.get_replication_count summ)
 -          visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
 -                                                     (hashmap-to-persistent bolts))
 -                                              spout-comp-summs
 -                                              bolt-comp-summs
 -                                              window
 -                                              id)]
 +          topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
 +                                               id
 +                                               window
 +                                               include-sys?)
 +          topology-conf (from-json (.get_topology_conf topo-page-info))
 +          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
        (merge
-        (unpack-topology-page-info topo-page-info window)
 -       (topology-summary summ)
++       (unpack-topology-page-info topo-page-info window secure?)
         {"user" user
          "window" window
          "windowHint" window-hint
          "msgTimeout" msg-timeout
 -        "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
 -        "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys? secure?)
 -        "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys? secure?)
          "configuration" topology-conf
 -        "visualizationTable" (stream-boxes visualizer-data)
 -        "replicationCount" replication-count}))))
 -
 -(defn spout-output-stats
 -  [stream-summary window]
 -  (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
 -    (for [[s stats] (stream-summary window)]
 -      {"stream" s
 -       "emitted" (nil-to-zero (:emitted stats))
 -       "transferred" (nil-to-zero (:transferred stats))
 -       "completeLatency" (float-str (:complete-latencies stats))
 -       "acked" (nil-to-zero (:acked stats))
 -       "failed" (nil-to-zero (:failed stats))})))
 -
 -(defn spout-executor-stats
 -  [topology-id executors window include-sys? secure?]
 -  (for [^ExecutorSummary e executors
 -        :let [stats (.get_stats e)
 -              stats (if stats
 -                      (-> stats
 -                          (aggregate-spout-stats include-sys?)
 -                          aggregate-spout-streams
 -                          swap-map-order
 -                          (get window)))]]
 -    {"id" (pretty-executor-info (.get_executor_info e))
 -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
 -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
 -     "host" (.get_host e)
 -     "port" (.get_port e)
 -     "emitted" (nil-to-zero (:emitted stats))
 -     "transferred" (nil-to-zero (:transferred stats))
 -     "completeLatency" (float-str (:complete-latencies stats))
 -     "acked" (nil-to-zero (:acked stats))
 -     "failed" (nil-to-zero (:failed stats))
 -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id secure?)}))
 +        "visualizationTable" []}))))
  
  (defn component-errors
-   [errors-list topology-id]
+   [errors-list topology-id secure?]
    (let [errors (->> errors-list
                      (sort-by #(.get_error_time_secs ^ErrorInfo %))
                      reverse)]
@@@ -500,188 -858,142 +557,204 @@@
         {"time" (* 1000 (long (.get_error_time_secs e)))
          "errorHost" (.get_host e)
          "errorPort"  (.get_port e)
-         "errorWorkerLogLink"  (worker-log-link (.get_host e) (.get_port e) topology-id)
 -        "errorWorkerLogLink"  (worker-log-link (.get_host e) (.get_port e) topology-id secure?)
++        "errorWorkerLogLink"  (worker-log-link (.get_host e)
++                                               (.get_port e)
++                                               topology-id
++                                               secure?)
          "errorLapsedSecs" (get-error-time e)
          "error" (.get_error e)})}))
  
 -(defn spout-stats
 -  [window ^TopologyInfo topology-info component executors include-sys? secure?]
 -  (let [window-hint (str " (" (window-hint window) ")")
 -        stats (get-filled-stats executors)
 -        stream-summary (-> stats (aggregate-spout-stats include-sys?))
 -        summary (-> stream-summary aggregate-spout-streams)]
 -    {"spoutSummary" (spout-summary-json
 -                      (.get_id topology-info) component summary window)
 -     "outputStats" (spout-output-stats stream-summary window)
 -     "executorStats" (spout-executor-stats (.get_id topology-info)
 -                                           executors window include-sys? secure?)}))
 -
 -(defn bolt-summary
 -  [topology-id id stats window]
 -  (let [times (stats-times (:emitted stats))
 -        display-map (into {} (for [t times] [t pretty-uptime-sec]))
 -        display-map (assoc display-map ":all-time" (fn [_] "All time"))]
 -    (for [k (concat times [":all-time"])
 -          :let [disp ((display-map k) k)]]
 -      {"window" k
 -       "windowPretty" disp
 -       "emitted" (get-in stats [:emitted k])
 -       "transferred" (get-in stats [:transferred k])
 -       "executeLatency" (float-str (get-in stats [:execute-latencies k]))
 -       "executed" (get-in stats [:executed k])
 -       "processLatency" (float-str (get-in stats [:process-latencies k]))
 -       "acked" (get-in stats [:acked k])
 -       "failed" (get-in stats [:failed k])})))
 -
 -(defn bolt-output-stats
 -  [stream-summary window]
 -  (let [stream-summary (-> stream-summary
 -                           swap-map-order
 -                           (get window)
 -                           (select-keys [:emitted :transferred])
 -                           swap-map-order)]
 -    (for [[s stats] stream-summary]
 -      {"stream" s
 -        "emitted" (nil-to-zero (:emitted stats))
 -        "transferred" (nil-to-zero (:transferred stats))})))
 -
 -(defn bolt-input-stats
 -  [stream-summary window]
 -  (let [stream-summary
 -        (-> stream-summary
 -            swap-map-order
 -            (get window)
 -            (select-keys [:acked :failed :process-latencies
 -                          :executed :execute-latencies])
 -            swap-map-order)]
 -    (for [[^GlobalStreamId s stats] stream-summary]
 -      {"component" (.get_componentId s)
 -       "encodedComponent" (url-encode (.get_componentId s))
 -       "stream" (.get_streamId s)
 -       "executeLatency" (float-str (:execute-latencies stats))
 -       "processLatency" (float-str (:process-latencies stats))
 -       "executed" (nil-to-zero (:executed stats))
 -       "acked" (nil-to-zero (:acked stats))
 -       "failed" (nil-to-zero (:failed stats))})))
 -
 -(defn bolt-executor-stats
 -  [topology-id executors window include-sys? secure?]
 -  (for [^ExecutorSummary e executors
 -        :let [stats (.get_stats e)
 -              stats (if stats
 -                      (-> stats
 -                          (aggregate-bolt-stats include-sys?)
 -                          (aggregate-bolt-streams)
 -                          swap-map-order
 -                          (get window)))]]
 -    {"id" (pretty-executor-info (.get_executor_info e))
 -     "encodedId" (url-encode (pretty-executor-info (.get_executor_info e)))
 -     "uptime" (pretty-uptime-sec (.get_uptime_secs e))
 -     "host" (.get_host e)
 -     "port" (.get_port e)
 -     "emitted" (nil-to-zero (:emitted stats))
 -     "transferred" (nil-to-zero (:transferred stats))
 -     "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
 -     "executeLatency" (float-str (:execute-latencies stats))
 -     "executed" (nil-to-zero (:executed stats))
 -     "processLatency" (float-str (:process-latencies stats))
 -     "acked" (nil-to-zero (:acked stats))
 -     "failed" (nil-to-zero (:failed stats))
 -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id secure?)}))
 -
 -(defn bolt-stats
 -  [window ^TopologyInfo topology-info component executors include-sys? secure?]
 -  (let [window-hint (str " (" (window-hint window) ")")
 -        stats (get-filled-stats executors)
 -        stream-summary (-> stats (aggregate-bolt-stats include-sys?))
 -        summary (-> stream-summary aggregate-bolt-streams)]
 -    {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
 -     "inputStats" (bolt-input-stats stream-summary window)
 -     "outputStats" (bolt-output-stats stream-summary window)
 -     "executorStats" (bolt-executor-stats
 -                       (.get_id topology-info) executors window include-sys? secure?)}))
 +(defmulti unpack-comp-agg-stat
 +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
 +
 +(defmethod unpack-comp-agg-stat ComponentType/BOLT
 +  [[window ^ComponentAggregateStats s]]
 +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
 +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
 +        ^BoltAggregateStats bolt-s (.get_bolt spec-s)]
 +    {"window" window
 +     "windowPretty" (window-hint window)
 +     "emitted" (.get_emitted comm-s)
 +     "transferred" (.get_transferred comm-s)
 +     "acked" (.get_acked comm-s)
 +     "failed" (.get_failed comm-s)
 +     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
 +     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
 +     "executed" (.get_executed bolt-s)
 +     "capacity" (float-str (.get_capacity bolt-s))}))
 +
 +(defmethod unpack-comp-agg-stat ComponentType/SPOUT
 +  [[window ^ComponentAggregateStats s]]
 +  (let [^CommonAggregateStats comm-s (.get_common_stats s)
 +        ^SpecificAggregateStats spec-s (.get_specific_stats s)
 +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
 +    {"window" window
 +     "windowPretty" (window-hint window)
 +     "emitted" (.get_emitted comm-s)
 +     "transferred" (.get_transferred comm-s)
 +     "acked" (.get_acked comm-s)
 +     "failed" (.get_failed comm-s)
 +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
 +
 +(defn- unpack-bolt-input-stat
 +  [[^GlobalStreamId s ^ComponentAggregateStats stats]]
 +  (let [^SpecificAggregateStats sas (.get_specific_stats stats)
 +        ^BoltAggregateStats bas (.get_bolt sas)
 +        ^CommonAggregateStats cas (.get_common_stats stats)
 +        comp-id (.get_componentId s)]
 +    {"component" comp-id
 +     "encodedComponentId" (url-encode comp-id)
 +     "stream" (.get_streamId s)
 +     "executeLatency" (float-str (.get_execute_latency_ms bas))
 +     "processLatency" (float-str (.get_process_latency_ms bas))
 +     "executed" (nil-to-zero (.get_executed bas))
 +     "acked" (nil-to-zero (.get_acked cas))
 +     "failed" (nil-to-zero (.get_failed cas))}))
 +
 +(defmulti unpack-comp-output-stat
 +  (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
 +
 +(defmethod unpack-comp-output-stat ComponentType/BOLT
 +  [[stream-id ^ComponentAggregateStats stats]]
 +  (let [^CommonAggregateStats cas (.get_common_stats stats)]
 +    {"stream" stream-id
 +     "emitted" (nil-to-zero (.get_emitted cas))
 +     "transferred" (nil-to-zero (.get_transferred cas))}))
 +
 +(defmethod unpack-comp-output-stat ComponentType/SPOUT
 +  [[stream-id ^ComponentAggregateStats stats]]
 +  (let [^CommonAggregateStats cas (.get_common_stats stats)
 +        ^SpecificAggregateStats spec-s (.get_specific_stats stats)
 +        ^SpoutAggregateStats spout-s (.get_spout spec-s)]
 +    {"stream" stream-id
 +     "emitted" (nil-to-zero (.get_emitted cas))
 +     "transferred" (nil-to-zero (.get_transferred cas))
 +     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
 +     "acked" (nil-to-zero (.get_acked cas))
 +     "failed" (nil-to-zero (.get_failed cas))}))
 +
 +(defmulti unpack-comp-exec-stat
-   (fn [_ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
++  (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
 +
 +(defmethod unpack-comp-exec-stat ComponentType/BOLT
-   [topology-id ^ExecutorAggregateStats eas]
++  [topology-id secure? ^ExecutorAggregateStats eas]
 +  (let [^ExecutorSummary summ (.get_exec_summary eas)
 +        ^ExecutorInfo info (.get_executor_info summ)
 +        ^ComponentAggregateStats stats (.get_stats eas)
 +        ^SpecificAggregateStats ss (.get_specific_stats stats)
 +        ^BoltAggregateStats bas (.get_bolt ss)
 +        ^CommonAggregateStats cas (.get_common_stats stats)
 +        host (.get_host summ)
 +        port (.get_port summ)
 +        exec-id (pretty-executor-info info)]
 +    {"id" exec-id
 +     "encodedId" (url-encode exec-id)
 +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
 +     "host" host
 +     "port" port
 +     "emitted" (nil-to-zero (.get_emitted cas))
 +     "transferred" (nil-to-zero (.get_transferred cas))
 +     "capacity" (float-str (nil-to-zero (.get_capacity bas)))
 +     "executeLatency" (float-str (.get_execute_latency_ms bas))
 +     "executed" (nil-to-zero (.get_executed bas))
 +     "processLatency" (float-str (.get_process_latency_ms bas))
 +     "acked" (nil-to-zero (.get_acked cas))
 +     "failed" (nil-to-zero (.get_failed cas))
-      "workerLogLink" (worker-log-link host port topology-id)}))
++     "workerLogLink" (worker-log-link host port topology-id secure?)}))
 +
 +(defmethod unpack-comp-exec-stat ComponentType/SPOUT
-   [topology-id ^ExecutorAggregateStats eas]
++  [topology-id secure? ^ExecutorAggregateStats eas]
 +  (let [^ExecutorSummary summ (.get_exec_summary eas)
 +        ^ExecutorInfo info (.get_executor_info summ)
 +        ^ComponentAggregateStats stats (.get_stats eas)
 +        ^SpecificAggregateStats ss (.get_specific_stats stats)
 +        ^SpoutAggregateStats sas (.get_spout ss)
 +        ^CommonAggregateStats cas (.get_common_stats stats)
 +        host (.get_host summ)
 +        port (.get_port summ)
 +        exec-id (pretty-executor-info info)]
 +    {"id" exec-id
 +     "encodedId" (url-encode exec-id)
 +     "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
 +     "host" host
 +     "port" port
 +     "emitted" (nil-to-zero (.get_emitted cas))
 +     "transferred" (nil-to-zero (.get_transferred cas))
 +     "completeLatency" (float-str (.get_complete_latency_ms sas))
 +     "acked" (nil-to-zero (.get_acked cas))
 +     "failed" (nil-to-zero (.get_failed cas))
-      "workerLogLink" (worker-log-link host port topology-id)}))
++     "workerLogLink" (worker-log-link host port topology-id secure?)}))
 +
 +(defmulti unpack-component-page-info
 +  "Unpacks component-specific info to clojure data structures"
 +  (fn [^ComponentPageInfo info & _]
 +    (.get_component_type info)))
 +
 +(defmethod unpack-component-page-info ComponentType/BOLT
-   [^ComponentPageInfo info topology-id window include-sys?]
++  [^ComponentPageInfo info topology-id window include-sys? secure?]
 +  (merge
 +    {"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
 +     "inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
 +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
-      "executorStats" (map (partial unpack-comp-exec-stat topology-id)
++     "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
 +                          (.get_exec_stats info))}
-     (-> info .get_errors (component-errors topology-id))))
++    (-> info .get_errors (component-errors topology-id secure?))))
 +
 +(defmethod unpack-component-page-info ComponentType/SPOUT
-   [^ComponentPageInfo info topology-id window include-sys?]
++  [^ComponentPageInfo info topology-id window include-sys? secure?]
 +  (merge
 +    {"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info))
 +     "outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
-      "executorStats" (map (partial unpack-comp-exec-stat topology-id)
++     "executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
 +                          (.get_exec_stats info))}
-     (-> info .get_errors (component-errors topology-id))))
++    (-> info .get_errors (component-errors topology-id secure?))))
  
  (defn component-page
-   [topology-id component window include-sys? user]
-   (with-nimbus nimbus
+   [topology-id component window include-sys? user secure?]
+   (thrift/with-configured-nimbus-connection nimbus
 -    (let [window (if window window ":all-time")
 -          summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
 -          topology (.getTopology ^Nimbus$Client nimbus topology-id)
 -          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))
 -          type (component-type topology component)
 -          summs (component-task-summs summ topology component)
 -          spec (cond (= type :spout) (spout-stats window summ component summs include-sys? secure?)
 -                     (= type :bolt) (bolt-stats window summ component summs include-sys? secure?))
 -          errors (component-errors (get (.get_errors summ) component) topology-id secure?)
 -          component->debug (.get_component_debug summ)
 -          debug-options (get component->debug component (get component->debug topology-id))]
 -      (merge
 -        {"user" user
 -         "id" component
 -         "encodedId" (url-encode component)
 -         "name" (.get_name summ)
 -         "executors" (count summs)
 -         "tasks" (sum-tasks summs)
 -         "topologyId" topology-id
 -         "topologyStatus" (.get_status summ)
 -         "encodedTopologyId" (url-encode topology-id)
 -         "window" window
 -         "componentType" (name type)
 -         "windowHint" (window-hint window)
 -         "debug" (if (not-nil? debug-options) (.is_enable debug-options) false)
 -         "samplingPct" (if (not-nil? debug-options) (.get_samplingpct debug-options) 10)
 -         "eventLogLink" (event-log-link topology-id summ topology component secure?)}
 -       spec errors))))
 +    (let [window (or window ":all-time")
 +          window-hint (window-hint window)
 +          comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
 +                                                topology-id
 +                                                component
 +                                                window
 +                                                include-sys?)
 +          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
 +                                                     topology-id))
-           msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
++          msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
++          [debugEnabled
++           samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)]
++                          [(.is_enable debug-opts)
++                           (.get_samplingpct debug-opts)])]
 +      (assoc
 +       (unpack-component-page-info comp-page-info
 +                                   topology-id
 +                                   window
-                                    include-sys?)
++                                   include-sys?
++                                   secure?)
 +       "user" user
 +       "id" component
 +       "encodedId" (url-encode component)
 +       "name" (.get_topology_name comp-page-info)
 +       "executors" (.get_num_executors comp-page-info)
 +       "tasks" (.get_num_tasks comp-page-info)
 +       "topologyId" topology-id
++       "topologyStatus" (.get_topology_status comp-page-info)
 +       "encodedTopologyId" (url-encode topology-id)
 +       "window" window
 +       "componentType" (-> comp-page-info .get_component_type str lower-case)
-        "windowHint" window-hint))))
++       "windowHint" window-hint
++       "debug" (or debugEnabled false)
++       "samplingPct" (or samplingPct 10)
++       "eventLogLink" (event-log-link topology-id
++                                      component
++                                      (.get_eventlog_host comp-page-info)
++                                      (.get_eventlog_port comp-page-info)
++                                      secure?)))))
  
  (defn topology-config [topology-id]
-   (with-nimbus nimbus
+   (thrift/with-configured-nimbus-connection nimbus
 -     (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
 +    (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
  
  (defn topology-op-response [topology-id op]
    {"topologyOperation" op,

http://git-wip-us.apache.org/repos/asf/storm/blob/5d847945/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------