You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/07/21 17:14:10 UTC

[1/2] storm git commit: Merge branch 'STORM-2557' of https://github.com/revans2/incubator-storm into STORM-2621

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 0f31560ac -> 0efb94ce0


Merge branch 'STORM-2557' of https://github.com/revans2/incubator-storm into STORM-2621

STORM-2621: add tuple_population metric


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fb2b9e5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fb2b9e5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fb2b9e5d

Branch: refs/heads/1.x-branch
Commit: fb2b9e5d680c1ee4bc822787b4a4d2f0b06d4983
Parents: 0f31560
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 21 10:40:39 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 21 10:55:12 2017 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/utils/DisruptorQueue.java    | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fb2b9e5d/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fe90240..5fd4b84 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -352,15 +352,18 @@ public class DisruptorQueue implements IStatefulObject {
             long rp = readPos();
             long wp = writePos();
 
+            final long tuplePop = tuplePopulation.get();
+
             final double arrivalRateInSecs = _rateTracker.reportRate();
 
             //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
             // If this assumption does not hold, the calculation of sojourn time should also consider
             // departure rate according to Queuing Theory.
-            final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+            final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
 
             state.put("capacity", capacity());
             state.put("population", wp - rp);
+            state.put("tuple_population", tuplePop);
             state.put("write_pos", wp);
             state.put("read_pos", rp);
             state.put("arrival_rate_secs", arrivalRateInSecs);
@@ -372,6 +375,11 @@ public class DisruptorQueue implements IStatefulObject {
 
         public void notifyArrivals(long counts) {
             _rateTracker.notify(counts);
+            tuplePopulation.getAndAdd(counts);
+        }
+
+        public void notifyDepartures(long counts) {
+            tuplePopulation.getAndAdd(-counts);
         }
 
         public void close() {
@@ -393,6 +401,7 @@ public class DisruptorQueue implements IStatefulObject {
     private int _lowWaterMark = 0;
     private boolean _enableBackpressure = false;
     private final AtomicLong _overflowCount = new AtomicLong(0);
+    private final AtomicLong tuplePopulation = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
     public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
@@ -469,6 +478,7 @@ public class DisruptorQueue implements IStatefulObject {
                 } else if (o == null) {
                     LOG.error("NULL found in {}:{}", this.getName(), cursor);
                 } else {
+                    _metrics.notifyDepartures(getTupleCount(o));
                     handler.onEvent(o, curr, curr == cursor);
                     if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
                         try {
@@ -545,8 +555,8 @@ public class DisruptorQueue implements IStatefulObject {
                 at++;
                 numberOfTuples += getTupleCount(obj);
             }
-            _buffer.publish(begin, end);
             _metrics.notifyArrivals(numberOfTuples);
+            _buffer.publish(begin, end);
         }
     }
 


[2/2] storm git commit: Added STORM-2621 to Changelog

Posted by bo...@apache.org.
Added STORM-2621 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0efb94ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0efb94ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0efb94ce

Branch: refs/heads/1.x-branch
Commit: 0efb94ce050d193d28464970771f78d4ed2162be
Parents: fb2b9e5
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 21 12:13:53 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 21 12:13:53 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0efb94ce/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 14b4455..eefc92f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@
  * STORM-2482: Refactor the Storm auto credential plugins to be more usable
 
 ## 1.1.1
+ * STORM-2621: add tuple_population metric
  * STORM-2639: Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
  * STORM-2544: Fixing issue in acking of tuples that hit retry limit under manual commit mode
  * STORM-2618: Add TridentKafkaStateUpdater for storm-kafka-client