You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:12 UTC
[06/38] storm git commit: Merge branch '1.x-branch' into metrics_v2
Merge branch '1.x-branch' into metrics_v2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/57a50f36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/57a50f36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/57a50f36
Branch: refs/heads/1.x-branch
Commit: 57a50f36df058937587cc08d81c235efc8dc720a
Parents: b8de0f3 c4a09d3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 30 17:07:02 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 30 17:07:02 2017 -0400
----------------------------------------------------------------------
.travis.yml | 5 +-
CHANGELOG.md | 1480 ------------------
DEVELOPER.md | 11 +-
bin/storm.cmd | 7 +
bin/storm.ps1 | 68 +
bin/storm.py | 9 +-
conf/storm-env.ps1 | 23 +
dev-tools/jira-github-join.py | 2 +-
dev-tools/jira/__init__.py | 285 ----
dev-tools/jira_github/__init__.py | 285 ++++
dev-tools/release_notes.py | 118 ++
dev-tools/report/report.py | 2 +-
docs/Metrics.md | 201 ++-
docs/SECURITY.md | 40 +-
docs/cgroups_in_storm.md | 71 -
docs/distcache-blobstore.md | 8 +-
docs/index.md | 1 -
docs/storm-kafka-client.md | 147 +-
examples/storm-elasticsearch-examples/pom.xml | 10 +
.../elasticsearch/bolt/EsIndexTopology.java | 6 +-
.../storm/elasticsearch/common/EsTestUtil.java | 14 +-
.../trident/TridentEsTopology.java | 3 +-
.../TridentKafkaClientWordCountNamedTopics.java | 27 +-
...identKafkaClientWordCountWildcardTopics.java | 5 +-
.../java/org/apache/storm/flux/TCKTest.java | 9 +
.../resources/configs/substitution-test.yaml | 3 +-
.../src/test/resources/configs/test.properties | 1 +
.../backends/trident/TestPlanCompiler.java | 2 +-
.../apache/storm/common/AbstractAutoCreds.java | 27 +-
.../apache/storm/hbase/security/AutoHBase.java | 39 +-
.../apache/storm/hdfs/security/AutoHDFS.java | 41 +-
.../apache/storm/hive/security/AutoHive.java | 39 +-
external/storm-elasticsearch/pom.xml | 44 +-
.../DefaultEsLookupResultOutput.java | 62 +
.../elasticsearch/ElasticsearchGetRequest.java | 36 -
.../elasticsearch/EsLookupResultOutput.java | 5 +-
.../elasticsearch/bolt/AbstractEsBolt.java | 53 +-
.../storm/elasticsearch/bolt/EsIndexBolt.java | 31 +-
.../storm/elasticsearch/bolt/EsLookupBolt.java | 49 +-
.../elasticsearch/bolt/EsPercolateBolt.java | 41 +-
.../common/DefaultEsTupleMapper.java | 19 +
.../storm/elasticsearch/common/EsConfig.java | 130 +-
.../elasticsearch/common/EsTupleMapper.java | 9 +
.../common/StormElasticSearchClient.java | 37 +-
.../common/TransportAddresses.java | 72 -
.../apache/storm/elasticsearch/doc/Index.java | 69 +
.../storm/elasticsearch/doc/IndexDoc.java | 43 +
.../storm/elasticsearch/doc/IndexItem.java | 91 ++
.../storm/elasticsearch/doc/IndexItemDoc.java | 42 +
.../apache/storm/elasticsearch/doc/Shards.java | 63 +
.../storm/elasticsearch/doc/SourceDoc.java | 43 +
.../response/BulkIndexResponse.java | 80 +
.../elasticsearch/response/LookupResponse.java | 63 +
.../response/PercolateResponse.java | 85 +
.../storm/elasticsearch/trident/EsState.java | 72 +-
.../elasticsearch/trident/EsStateFactory.java | 15 +-
.../bolt/AbstractEsBoltIntegrationTest.java | 68 +-
.../elasticsearch/bolt/AbstractEsBoltTest.java | 15 +-
.../elasticsearch/bolt/EsIndexBoltTest.java | 44 +-
.../bolt/EsLookupBoltIntegrationTest.java | 75 +-
.../elasticsearch/bolt/EsLookupBoltTest.java | 67 +-
.../elasticsearch/bolt/EsPercolateBoltTest.java | 62 +-
.../elasticsearch/common/EsConfigTest.java | 60 +-
.../storm/elasticsearch/common/EsTestUtil.java | 101 +-
.../common/TransportAddressesTest.java | 81 -
.../trident/EsStateFactoryTest.java | 2 +-
.../elasticsearch/trident/EsStateTest.java | 98 ++
.../src/test/resources/log4j2.xml | 33 +
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 23 +-
.../java/org/apache/storm/hdfs/bolt/Writer.java | 35 +
.../storm/hdfs/common/AbstractHDFSWriter.java | 16 +-
.../org/apache/storm/jms/spout/JmsSpout.java | 291 +++-
.../apache/storm/jms/spout/JmsSpoutTest.java | 81 +-
.../kafka/spout/EmptyKafkaTupleListener.java | 53 +
.../apache/storm/kafka/spout/KafkaSpout.java | 15 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 574 ++++---
.../storm/kafka/spout/KafkaTupleListener.java | 83 +
.../spout/ManualPartitionNamedSubscription.java | 78 -
.../ManualPartitionPatternSubscription.java | 76 -
.../spout/ManualPartitionSubscription.java | 72 +
.../storm/kafka/spout/ManualPartitioner.java | 4 +-
.../storm/kafka/spout/NamedSubscription.java | 4 +-
.../storm/kafka/spout/NamedTopicFilter.java | 68 +
.../storm/kafka/spout/PatternSubscription.java | 4 +-
.../storm/kafka/spout/PatternTopicFilter.java | 70 +
.../kafka/spout/SerializableDeserializer.java | 6 +-
.../apache/storm/kafka/spout/Subscription.java | 2 +-
.../apache/storm/kafka/spout/TopicFilter.java | 38 +
.../internal/KafkaConsumerFactoryDefault.java | 3 +-
.../kafka/spout/internal/OffsetManager.java | 9 +-
.../spout/trident/KafkaTridentSpoutManager.java | 3 +-
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 131 ++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 17 +-
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 53 +-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 37 +-
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 116 ++
.../kafka/spout/MaxUncommittedOffsetTest.java | 10 +-
.../storm/kafka/spout/NamedTopicFilterTest.java | 70 +
.../kafka/spout/PatternTopicFilterTest.java | 75 +
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 10 +-
.../SpoutWithMockedConsumerSetupHelper.java | 87 +
.../SingleTopicKafkaSpoutConfiguration.java | 48 +-
.../test/KafkaSpoutTopologyMainNamedTopics.java | 4 +-
.../KafkaSpoutTopologyMainWildcardTopics.java | 4 +-
integration-test/config/install-zookeeper.sh | 2 +-
integration-test/run-it.sh | 3 +-
.../org/apache/storm/command/config_value.clj | 6 +-
.../src/clj/org/apache/storm/converter.clj | 11 +-
.../src/clj/org/apache/storm/daemon/common.clj | 4 +-
.../clj/org/apache/storm/daemon/executor.clj | 14 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 71 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 15 +-
storm-core/src/jvm/org/apache/storm/Config.java | 16 +-
.../storm/cluster/StormClusterStateImpl.java | 16 +-
.../storm/daemon/supervisor/AdvancedFSOps.java | 16 +-
.../storm/daemon/supervisor/Container.java | 6 +-
.../daemon/supervisor/ReadClusterState.java | 3 +
.../apache/storm/daemon/supervisor/Slot.java | 11 +
.../storm/daemon/supervisor/Supervisor.java | 16 +-
.../daemon/supervisor/SupervisorUtils.java | 11 +-
.../daemon/supervisor/timer/UpdateBlobs.java | 26 +-
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 6 -
.../org/apache/storm/generated/Assignment.java | 114 +-
.../apache/storm/generated/LocalAssignment.java | 114 +-
.../org/apache/storm/generated/StormBase.java | 114 +-
.../apache/storm/localizer/AsyncLocalizer.java | 37 +-
.../jvm/org/apache/storm/scheduler/Cluster.java | 85 +-
.../apache/storm/scheduler/TopologyDetails.java | 35 +-
.../multitenant/MultitenantScheduler.java | 2 +-
.../storm/security/INimbusCredentialPlugin.java | 24 +-
.../security/auth/ICredentialsRenewer.java | 18 +-
.../storm/security/auth/kerberos/AutoTGT.java | 6 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 31 +-
.../org/apache/storm/utils/DisruptorQueue.java | 70 +-
.../org/apache/storm/utils/ObjectReader.java | 58 +
.../src/jvm/org/apache/storm/utils/Utils.java | 52 +-
storm-core/src/py/storm/ttypes.py | 47 +-
storm-core/src/storm.thrift | 5 +
.../test/clj/org/apache/storm/cluster_test.clj | 8 +-
.../scheduler/multitenant_scheduler_test.clj | 77 +-
.../scheduler/resource_aware_scheduler_test.clj | 49 +-
.../clj/org/apache/storm/scheduler_test.clj | 10 +-
.../test/jvm/org/apache/storm/ConfigTest.java | 92 ++
.../storm/daemon/supervisor/ContainerTest.java | 6 +-
.../storm/localizer/AsyncLocalizerTest.java | 8 +-
.../org/apache/storm/scheduler/ClusterTest.java | 111 ++
.../resource/TestResourceAwareScheduler.java | 183 ++-
.../storm/scheduler/resource/TestUser.java | 7 +-
.../TestUtilsForResourceAwareScheduler.java | 34 +-
.../eviction/TestDefaultEvictionStrategy.java | 118 +-
.../TestDefaultResourceAwareStrategy.java | 8 +-
.../org/apache/storm/utils/ConfigUtilsTest.java | 98 ++
.../jvm/org/apache/storm/utils/UtilsTest.java | 84 +
storm-dist/binary/src/main/assembly/binary.xml | 10 +-
.../src/main/resources/resources/storm.py | 2 +-
155 files changed, 5523 insertions(+), 3666 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 95e43f6,52063fc..250ace1
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -233,10 -231,7 +233,10 @@@
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ (.getStormId worker-context)
+ component-id
+ (.getThisWorkerPort worker-context)
- :producer-type :single-threaded
+ :producer-type :multi-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
]
@@@ -820,9 -811,8 +822,9 @@@
(let [delta (tuple-time-delta! tuple)]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (.mark ^Meter (:acked-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
- (when delta
+ (when (<= 0 delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
@@@ -836,9 -826,8 +838,9 @@@
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
+ (.mark ^Meter (:failed-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
- (when delta
+ (when (<= 0 delta)
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 5c0a2fb,5fd4b84..ca8568c
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@@ -34,9 -34,6 +34,8 @@@ import com.lmax.disruptor.dsl.ProducerT
import org.apache.storm.Config;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.DisruptorMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
- import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -62,19 -59,18 +61,19 @@@ import java.util.concurrent.locks.Reent
* the ability to catch up to the producer by processing tuples in batches.
*/
public class DisruptorQueue implements IStatefulObject {
-- private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
++ private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
private static final FlusherPool FLUSHER = new FlusherPool();
--
+ private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
++
private static int getNumFlusherPoolThreads() {
int numThreads = 100;
try {
-- Map<String, Object> conf = Utils.readStormConfig();
-- numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
++ Map<String, Object> conf = Utils.readStormConfig();
++ numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
} catch (Exception e) {
-- LOG.warn("Error while trying to read system config", e);
++ LOG.warn("Error while trying to read system config", e);
}
try {
String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
@@@ -86,8 -82,8 +85,8 @@@
return numThreads;
}
-- private static class FlusherPool {
-- private static final String THREAD_PREFIX = "disruptor-flush";
++ private static class FlusherPool {
++ private static final String THREAD_PREFIX = "disruptor-flush";
private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true);
private ThreadPoolExecutor _exec;
private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
@@@ -201,8 -197,8 +200,8 @@@
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
-- //Someone else if flushing so don't do anything
-- return;
++ //Someone else if flushing so don't do anything
++ return;
}
try {
while (!_overflow.isEmpty()) {
@@@ -256,7 -252,7 +255,7 @@@
}
}
-- if (!flushed) {
++ if (!flushed) {
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
@@@ -276,8 -272,8 +275,8 @@@
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
-- //Someone else if flushing so don't do anything
-- return;
++ //Someone else if flushing so don't do anything
++ return;
}
try {
while (!_overflow.isEmpty()) {
@@@ -349,11 -345,9 +348,17 @@@
return (1.0F * population() / capacity());
}
+ public double arrivalRate(){
+ return _rateTracker.reportRate();
+ }
+
+ public double sojournTime(){
++ return tuplePopulation.get() / Math.max(arrivalRate(), 0.00001) * 1000.0;
++ }
++
+ public Object getState() {
+ Map state = new HashMap<String, Object>();
+
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
@@@ -393,8 -393,7 +404,8 @@@
private final int _inputBatchSize;
private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
private final Flusher _flusher;
- private final QueueMetrics _metrics; // old metrics API
+ private final QueueMetrics _metrics;
+ private final DisruptorMetrics _disruptorMetrics;
private String _queueName = "";
private DisruptorBackpressureCallback _cb = null;
@@@ -402,9 -401,10 +413,10 @@@
private int _lowWaterMark = 0;
private boolean _enableBackpressure = false;
private final AtomicLong _overflowCount = new AtomicLong(0);
+ private final AtomicLong tuplePopulation = new AtomicLong(0);
private volatile boolean _throttleOn = false;
- public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
+ public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) {
this._queueName = PREFIX + queueName;
WaitStrategy wait;
if (readTimeout <= 0) {
@@@ -425,13 -424,6 +437,12 @@@
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
-
+ METRICS_TIMER.schedule(new TimerTask(){
+ @Override
+ public void run() {
+ _disruptorMetrics.set(_metrics);
+ }
- }, 15000, 15000); // TODO: Configurable interval
++ }, 15000, 15000);
}
public String getName() {
@@@ -619,4 -612,4 +631,4 @@@
public QueueMetrics getMetrics() {
return _metrics;
}
--}
++}