You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/07/21 17:14:10 UTC
[1/2] storm git commit: Merge branch 'STORM-2557' of
https://github.com/revans2/incubator-storm into STORM-2621
Repository: storm
Updated Branches:
refs/heads/1.x-branch 0f31560ac -> 0efb94ce0
Merge branch 'STORM-2557' of https://github.com/revans2/incubator-storm into STORM-2621
STORM-2621: add tuple_population metric
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fb2b9e5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fb2b9e5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fb2b9e5d
Branch: refs/heads/1.x-branch
Commit: fb2b9e5d680c1ee4bc822787b4a4d2f0b06d4983
Parents: 0f31560
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 21 10:40:39 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 21 10:55:12 2017 -0500
----------------------------------------------------------------------
.../jvm/org/apache/storm/utils/DisruptorQueue.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fb2b9e5d/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fe90240..5fd4b84 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -352,15 +352,18 @@ public class DisruptorQueue implements IStatefulObject {
long rp = readPos();
long wp = writePos();
+ final long tuplePop = tuplePopulation.get();
+
final double arrivalRateInSecs = _rateTracker.reportRate();
//Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
- final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+ final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
state.put("capacity", capacity());
state.put("population", wp - rp);
+ state.put("tuple_population", tuplePop);
state.put("write_pos", wp);
state.put("read_pos", rp);
state.put("arrival_rate_secs", arrivalRateInSecs);
@@ -372,6 +375,11 @@ public class DisruptorQueue implements IStatefulObject {
public void notifyArrivals(long counts) {
_rateTracker.notify(counts);
+ tuplePopulation.getAndAdd(counts);
+ }
+
+ public void notifyDepartures(long counts) {
+ tuplePopulation.getAndAdd(-counts);
}
public void close() {
@@ -393,6 +401,7 @@ public class DisruptorQueue implements IStatefulObject {
private int _lowWaterMark = 0;
private boolean _enableBackpressure = false;
private final AtomicLong _overflowCount = new AtomicLong(0);
+ private final AtomicLong tuplePopulation = new AtomicLong(0);
private volatile boolean _throttleOn = false;
public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
@@ -469,6 +478,7 @@ public class DisruptorQueue implements IStatefulObject {
} else if (o == null) {
LOG.error("NULL found in {}:{}", this.getName(), cursor);
} else {
+ _metrics.notifyDepartures(getTupleCount(o));
handler.onEvent(o, curr, curr == cursor);
if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
try {
@@ -545,8 +555,8 @@ public class DisruptorQueue implements IStatefulObject {
at++;
numberOfTuples += getTupleCount(obj);
}
- _buffer.publish(begin, end);
_metrics.notifyArrivals(numberOfTuples);
+ _buffer.publish(begin, end);
}
}
[2/2] storm git commit: Added STORM-2621 to Changelog
Posted by bo...@apache.org.
Added STORM-2621 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0efb94ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0efb94ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0efb94ce
Branch: refs/heads/1.x-branch
Commit: 0efb94ce050d193d28464970771f78d4ed2162be
Parents: fb2b9e5
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 21 12:13:53 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 21 12:13:53 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0efb94ce/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 14b4455..eefc92f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@
* STORM-2482: Refactor the Storm auto credential plugins to be more usable
## 1.1.1
+ * STORM-2621: add tuple_population metric
* STORM-2639: Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
* STORM-2544: Fixing issue in acking of tuples that hit retry limit under manual commit mode
* STORM-2618: Add TridentKafkaStateUpdater for storm-kafka-client