You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2015/10/24 16:20:00 UTC

[GitHub] storm pull request: STORM-1128: Make metrics fast

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/816

    STORM-1128: Make metrics fast

    The metrics in my profiling are contributing significantly to the CPU utilization of storm, which slows it down.  With this patch I was able to go from 6,500 sentences/second fully counted to 11,000 on a Mac Book Pro running two workers in the Throughput vs. Latency test.  If I turn off sub-sampling of metrics the performance goes back to about where it was before.  There is still more optimizations that could be done with the metrics, like having the latency metric also report counts, so we don't have to do an extra lookup and inc operation on the critical path, but for now this is good enough, and allows me to start looking at the next bottleneck.
    
    These changes also compliment the batching changes very well.  The batching went from 6,500 sentences/second to 15,000 by itself.  This patch increases it to 25,000 sentences/second.  That is almost a 4x improvement by using both together.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm fast-counting

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #816
    
----
commit e75219b5a4856c39c4031c3d6205105e034d10af
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2015-10-22T15:48:04Z

    STORM-1128: Make metrics fast

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/816#discussion_r49327965
  
    --- Diff: storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.metric.internal;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import backtype.storm.metric.api.IMetric;
    +
    +/**
    + * Acts as a Count Metric, but also keeps track of approximate counts
    + * for the last 10 mins, 3 hours, 1 day, and all time.
    + */
    +public class CountStatAndMetric implements IMetric{
    +    private final AtomicLong _currentBucket;
    +    // All internal state except for the count of the current bucket are
    +    // protected using a lock on this counter
    +    private long _bucketStart;
    +
    +    //exact variable time, that is added to the current bucket
    +    private long _exactExtra;
    + 
    +    //10 min values
    +    private final int _tmSize;
    +    private final long[] _tmBuckets;
    +    private final long[] _tmTime;
    +    
    +    //3 hour values
    +    private final int _thSize;
    +    private final long[] _thBuckets;
    +    private final long[] _thTime;
    +
    +    //1 day values
    +    private final int _odSize;
    +    private final long[] _odBuckets;
    +    private final long[] _odTime;
    + 
    +    //all time
    +    private long _allTime;
    +
    +    private final TimerTask _task;
    +
    +    /**
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     */
    +    public CountStatAndMetric(int numBuckets) {
    +        this(numBuckets, -1);
    +    }
    +
    +    /**
    +     * Constructor
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     * @param startTime if positive the simulated time to start the from.
    +     */
    +    CountStatAndMetric(int numBuckets, long startTime){
    +        numBuckets = Math.max(numBuckets, 2);
    +        //We want to capture the full time range, so the target size is as
    +        // if we had one bucket less, then we do
    +        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
    +        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
    +        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
    +        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
    +            throw new IllegalArgumentException("number of buckets is too large to be supported");
    +        }
    +        _tmBuckets = new long[numBuckets];
    +        _tmTime = new long[numBuckets];
    +        _thBuckets = new long[numBuckets];
    +        _thTime = new long[numBuckets];
    +        _odBuckets = new long[numBuckets];
    +        _odTime = new long[numBuckets];
    +        _allTime = 0;
    +        _exactExtra = 0;
    +
    +        _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
    +        _currentBucket = new AtomicLong(0);
    +        if (startTime < 0) {
    +            _task = new Fresher();
    +            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
    +        } else {
    +            _task = null;
    +        }
    +    }
    +
    +    /**
    +     * Increase the count by the given value.
    +     *
    +     * @param count number to count
    +     */
    +    public void incBy(long count) {
    +        _currentBucket.addAndGet(count);
    +    }
    +
    +   
    +
    +    @Override
    +    public synchronized Object getValueAndReset() {
    +        return getValueAndReset(System.currentTimeMillis());
    +    }
    +
    +    synchronized Object getValueAndReset(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        long ret = value + _exactExtra;
    +        _bucketStart = now;
    +        _exactExtra = 0;
    +        rotateBuckets(value, timeSpent);
    +        return ret;
    +    }
    +
    +    synchronized void rotateSched(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        _exactExtra += value;
    +        _bucketStart = now;
    +        rotateBuckets(value, timeSpent);
    +    }
    +
    +    synchronized void rotateBuckets(long value, long timeSpent) {
    +        rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
    +        rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
    +        rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
    +        _allTime += value;
    +    }
    +
    +    private synchronized void rotate(long value, long timeSpent, long targetSize, long [] times, long [] buckets) {
    +        times[0] += timeSpent;
    +        buckets[0] += value;
    +
    +        long currentTime = 0;
    +        long currentVal = 0;
    +        if (times[0] >= targetSize) {
    +            for (int i = 0; i < buckets.length; i++) {
    --- End diff --
    
    Yes we could, but I don't think it is going to make that much of a difference.  I purposely moved rotating the metrics off of the critical path.  The call to rotate happens in a background thread, that is shared by all similar metrics in a worker process.  This allows the critical path, the thing that happens all of the time, to be just an atomic increment.  Rotating only happens every 30 seconds or so too.  So I suspect the best that it is going to do is reduce the overall CPU the worker uses by a tiny amount.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/816


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/816#discussion_r49328566
  
    --- Diff: storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.metric.internal;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import backtype.storm.metric.api.IMetric;
    +
    +/**
    + * Acts as a Count Metric, but also keeps track of approximate counts
    + * for the last 10 mins, 3 hours, 1 day, and all time.
    + */
    +public class CountStatAndMetric implements IMetric{
    +    private final AtomicLong _currentBucket;
    +    // All internal state except for the count of the current bucket are
    +    // protected using a lock on this counter
    +    private long _bucketStart;
    +
    +    //exact variable time, that is added to the current bucket
    +    private long _exactExtra;
    + 
    +    //10 min values
    +    private final int _tmSize;
    +    private final long[] _tmBuckets;
    +    private final long[] _tmTime;
    +    
    +    //3 hour values
    +    private final int _thSize;
    +    private final long[] _thBuckets;
    +    private final long[] _thTime;
    +
    +    //1 day values
    +    private final int _odSize;
    +    private final long[] _odBuckets;
    +    private final long[] _odTime;
    + 
    +    //all time
    +    private long _allTime;
    +
    +    private final TimerTask _task;
    +
    +    /**
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     */
    +    public CountStatAndMetric(int numBuckets) {
    +        this(numBuckets, -1);
    +    }
    +
    +    /**
    +     * Constructor
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     * @param startTime if positive the simulated time to start the from.
    +     */
    +    CountStatAndMetric(int numBuckets, long startTime){
    +        numBuckets = Math.max(numBuckets, 2);
    +        //We want to capture the full time range, so the target size is as
    +        // if we had one bucket less, then we do
    +        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
    +        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
    +        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
    +        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
    +            throw new IllegalArgumentException("number of buckets is too large to be supported");
    +        }
    +        _tmBuckets = new long[numBuckets];
    +        _tmTime = new long[numBuckets];
    +        _thBuckets = new long[numBuckets];
    +        _thTime = new long[numBuckets];
    +        _odBuckets = new long[numBuckets];
    +        _odTime = new long[numBuckets];
    +        _allTime = 0;
    +        _exactExtra = 0;
    +
    +        _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
    +        _currentBucket = new AtomicLong(0);
    +        if (startTime < 0) {
    +            _task = new Fresher();
    +            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
    +        } else {
    +            _task = null;
    +        }
    +    }
    +
    +    /**
    +     * Increase the count by the given value.
    +     *
    +     * @param count number to count
    +     */
    +    public void incBy(long count) {
    +        _currentBucket.addAndGet(count);
    +    }
    +
    +   
    +
    +    @Override
    +    public synchronized Object getValueAndReset() {
    +        return getValueAndReset(System.currentTimeMillis());
    +    }
    +
    +    synchronized Object getValueAndReset(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        long ret = value + _exactExtra;
    +        _bucketStart = now;
    +        _exactExtra = 0;
    +        rotateBuckets(value, timeSpent);
    +        return ret;
    +    }
    +
    +    synchronized void rotateSched(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        _exactExtra += value;
    +        _bucketStart = now;
    +        rotateBuckets(value, timeSpent);
    +    }
    +
    +    synchronized void rotateBuckets(long value, long timeSpent) {
    +        rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
    +        rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
    +        rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
    +        _allTime += value;
    +    }
    +
    +    private synchronized void rotate(long value, long timeSpent, long targetSize, long [] times, long [] buckets) {
    +        times[0] += timeSpent;
    +        buckets[0] += value;
    +
    +        long currentTime = 0;
    +        long currentVal = 0;
    +        if (times[0] >= targetSize) {
    +            for (int i = 0; i < buckets.length; i++) {
    --- End diff --
    
    Right. I missed that it is not in critical path. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/816#discussion_r49265495
  
    --- Diff: storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.metric.internal;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import backtype.storm.metric.api.IMetric;
    +
    +/**
    + * Acts as a Count Metric, but also keeps track of approximate counts
    + * for the last 10 mins, 3 hours, 1 day, and all time.
    + */
    +public class CountStatAndMetric implements IMetric{
    +    private final AtomicLong _currentBucket;
    +    // All internal state except for the count of the current bucket are
    +    // protected using a lock on this counter
    +    private long _bucketStart;
    +
    +    //exact variable time, that is added to the current bucket
    +    private long _exactExtra;
    + 
    +    //10 min values
    +    private final int _tmSize;
    +    private final long[] _tmBuckets;
    +    private final long[] _tmTime;
    +    
    +    //3 hour values
    +    private final int _thSize;
    +    private final long[] _thBuckets;
    +    private final long[] _thTime;
    +
    +    //1 day values
    +    private final int _odSize;
    +    private final long[] _odBuckets;
    +    private final long[] _odTime;
    + 
    +    //all time
    +    private long _allTime;
    +
    +    private final TimerTask _task;
    +
    +    /**
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     */
    +    public CountStatAndMetric(int numBuckets) {
    +        this(numBuckets, -1);
    +    }
    +
    +    /**
    +     * Constructor
    +     * @param numBuckets the number of buckets to divide the time periods into.
    +     * @param startTime if positive the simulated time to start the from.
    +     */
    +    CountStatAndMetric(int numBuckets, long startTime){
    +        numBuckets = Math.max(numBuckets, 2);
    +        //We want to capture the full time range, so the target size is as
    +        // if we had one bucket less, then we do
    +        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
    +        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
    +        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
    +        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
    +            throw new IllegalArgumentException("number of buckets is too large to be supported");
    +        }
    +        _tmBuckets = new long[numBuckets];
    +        _tmTime = new long[numBuckets];
    +        _thBuckets = new long[numBuckets];
    +        _thTime = new long[numBuckets];
    +        _odBuckets = new long[numBuckets];
    +        _odTime = new long[numBuckets];
    +        _allTime = 0;
    +        _exactExtra = 0;
    +
    +        _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
    +        _currentBucket = new AtomicLong(0);
    +        if (startTime < 0) {
    +            _task = new Fresher();
    +            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
    +        } else {
    +            _task = null;
    +        }
    +    }
    +
    +    /**
    +     * Increase the count by the given value.
    +     *
    +     * @param count number to count
    +     */
    +    public void incBy(long count) {
    +        _currentBucket.addAndGet(count);
    +    }
    +
    +   
    +
    +    @Override
    +    public synchronized Object getValueAndReset() {
    +        return getValueAndReset(System.currentTimeMillis());
    +    }
    +
    +    synchronized Object getValueAndReset(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        long ret = value + _exactExtra;
    +        _bucketStart = now;
    +        _exactExtra = 0;
    +        rotateBuckets(value, timeSpent);
    +        return ret;
    +    }
    +
    +    synchronized void rotateSched(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        _exactExtra += value;
    +        _bucketStart = now;
    +        rotateBuckets(value, timeSpent);
    +    }
    +
    +    synchronized void rotateBuckets(long value, long timeSpent) {
    +        rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
    +        rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
    +        rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
    +        _allTime += value;
    +    }
    +
    +    private synchronized void rotate(long value, long timeSpent, long targetSize, long [] times, long [] buckets) {
    +        times[0] += timeSpent;
    +        buckets[0] += value;
    +
    +        long currentTime = 0;
    +        long currentVal = 0;
    +        if (times[0] >= targetSize) {
    +            for (int i = 0; i < buckets.length; i++) {
    --- End diff --
    
    @revans2  Instead of making a pass over the array, can we instead keep an index for each array and rotate the index instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/816#issuecomment-150828548
  
    Just added in some fixes to remove reflection from the critical path.  The patch now is able to support 12,000 sentences/second.  it seems to be just shy of 13,000.  it can do it for a while once the JIT has kicked in, but falls behind very slowly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the pull request:

    https://github.com/apache/storm/pull/816#issuecomment-150853651
  
    LGTM. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1128: Make metrics fast

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/816#issuecomment-150834384
  
    @revans2 ran some tests with regular topologies. Patch looks good to me. For measurements are you running the ThroughputVsLatency? if so I am getting  15,000 acked/sec.
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---