You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/13 04:15:20 UTC

[1/5] storm git commit: STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer

Repository: storm
Updated Branches:
  refs/heads/master ded5a0df1 -> d42276f6f


STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer

* Users can set whitelist or blacklist to filter out metrics by name
  * if none of them specified (by default), no metrics are filtered out
* how to match: substring match with regular expression
  * use both ^ and $ when you want to match strictly (full string match)
* cache whether the metric name is filtered in or out
  * kinds of metrics are not changing during lifecycle of the topology
  * so applying regex for every metrics could be waste of CPU
* added unit test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/565d53dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/565d53dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/565d53dc

Branch: refs/heads/master
Commit: 565d53dc655a4ffa5626b81658379cdf96a13fff
Parents: 4dd6de9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Apr 9 14:11:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                         |   9 ++
 .../org/apache/storm/daemon/StormCommon.java    |   7 +-
 .../storm/metric/MetricsConsumerBolt.java       |  38 +++++--
 .../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++
 .../storm/metric/filter/MetricsFilter.java      |  26 +++++
 .../metric/filter/FilterByMetricNameTest.java   |  95 ++++++++++++++++
 6 files changed, 275 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 0e8b354..c6d9544 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -39,12 +39,21 @@
 #     - "server2"
 
 ## Metrics Consumers
+## max.retain.metric.tuples
+## - task queue will be unbounded when max.retain.metric.tuples is equal or less than 0.
+## whitelist / blacklist
+## - when none of configuration for metric filter are specified, it'll be treated as 'pass all'.
+## - you need to specify either whitelist or blacklist, or none of them. You can't specify both of them.
+## - you can specify multiple whitelist / blacklist with regular expression
 # topology.metrics.consumer.register:
 #   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
 #     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #   - class: "org.mycompany.MyMetricsConsumer"
 #     max.retain.metric.tuples: 100
+#     whitelist:
+#       - "execute.*"
+#       - "^__complete-latency$"
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 0dbb9f2..5097167 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -38,6 +38,7 @@ import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.metric.EventLoggerBolt;
 import org.apache.storm.metric.MetricsConsumerBolt;
 import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.metric.filter.FilterByMetricName;
 import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.testing.NonRichBoltTracker;
@@ -387,8 +388,12 @@ public class StormCommon {
                 Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
                 Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+                List<String> whitelist = (List<String>) info.get("whitelist");
+                List<String> blacklist = (List<String>) info.get("blacklist");
+                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                 Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
-                        new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
+                        new MetricsConsumerBolt(className, argument, maxRetainMetricTuples, filterPredicate),
+                        null, phintNum, metricsConsumerConf);
 
                 String id = className;
                 if (classOccurrencesMap.containsKey(className)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 95f9137..16a253c 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.metric;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.task.IBolt;
@@ -27,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -39,15 +43,25 @@ public class MetricsConsumerBolt implements IBolt {
     OutputCollector _collector;
     Object _registrationArgument;
     private final int _maxRetainMetricTuples;
+    private Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
 
-    private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
+    private final BlockingQueue<MetricsTask> _taskQueue;
     private Thread _taskExecuteThread;
     private volatile boolean _running = true;
 
-    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) {
+    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
+                               Predicate<IMetricsConsumer.DataPoint> filterPredicate) {
+
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
         _maxRetainMetricTuples = maxRetainMetricTuples;
+        _filterPredicate = filterPredicate;
+
+        if (_maxRetainMetricTuples > 0) {
+            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
+        } else {
+            _taskQueue = new LinkedBlockingDeque<>();
+        }
     }
 
     @Override
@@ -67,17 +81,22 @@ public class MetricsConsumerBolt implements IBolt {
     
     @Override
     public void execute(Tuple input) {
-        // remove older tasks if task queue exceeds the max size
-        if (_taskQueue.size() > _maxRetainMetricTuples) {
-            while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
-                _taskQueue.poll();
-            }
+        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
+        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
+        List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(dataPoints);
+        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);
+
+        while (! _taskQueue.offer(metricsTask)) {
+            _taskQueue.poll();
         }
 
-        _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
         _collector.ack(input);
     }
 
+    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
+        return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
+    }
+
     @Override
     public void cleanup() {
         _running = false;
@@ -85,7 +104,7 @@ public class MetricsConsumerBolt implements IBolt {
         _taskExecuteThread.interrupt();
     }
 
-    class MetricsTask {
+    static class MetricsTask {
         private IMetricsConsumer.TaskInfo taskInfo;
         private Collection<IMetricsConsumer.DataPoint> dataPoints;
 
@@ -119,4 +138,5 @@ public class MetricsConsumerBolt implements IBolt {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
new file mode 100644
index 0000000..9c95428
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class FilterByMetricName implements MetricsFilter {
+    private final Cache<String, Boolean> filterCache;
+    private final List<Pattern> whitelistPattern;
+    private final List<Pattern> blacklistPattern;
+    private boolean noneSpecified = false;
+
+    public FilterByMetricName(List<String> whitelistPattern, List<String> blacklistPattern) {
+        // guard NPE
+        if (whitelistPattern == null) {
+            this.whitelistPattern = Collections.emptyList();
+        } else {
+            this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern);
+        }
+
+        // guard NPE
+        if (blacklistPattern == null) {
+            this.blacklistPattern = Collections.emptyList();
+        } else {
+            this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern);
+        }
+
+        if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) {
+            noneSpecified = true;
+        } else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) {
+            throw new IllegalArgumentException("You have to specify either includes or excludes, or none.");
+        }
+
+        filterCache = CacheBuilder.newBuilder()
+                .maximumSize(1000)
+                .build();
+    }
+
+    @Override
+    public boolean apply(IMetricsConsumer.DataPoint dataPoint) {
+        if (noneSpecified) {
+            return true;
+        }
+
+        String metricName = dataPoint.name;
+
+        Boolean cachedFilteredIn = filterCache.getIfPresent(metricName);
+        if (cachedFilteredIn != null) {
+            return cachedFilteredIn;
+        } else {
+            boolean filteredIn = isFilteredIn(metricName);
+            filterCache.put(metricName, filteredIn);
+            return filteredIn;
+        }
+    }
+
+    private ArrayList<Pattern> convertPatternStringsToPatternInstances(List<String> patterns) {
+        return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() {
+            @Override
+            public Pattern apply(String s) {
+                return Pattern.compile(s);
+            }
+        }));
+    }
+
+    private boolean isFilteredIn(String metricName) {
+        if (!whitelistPattern.isEmpty()) {
+            return checkMatching(metricName, whitelistPattern, true);
+        } else if (!blacklistPattern.isEmpty()) {
+            return checkMatching(metricName, blacklistPattern, false);
+        }
+
+        throw new IllegalStateException("Shouldn't reach here");
+    }
+
+    private boolean checkMatching(String metricName, List<Pattern> patterns, boolean valueWhenMatched) {
+        for (Pattern pattern : patterns) {
+            if (pattern.matcher(metricName).find()) {
+                return valueWhenMatched;
+            }
+        }
+
+        return !valueWhenMatched;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
new file mode 100644
index 0000000..f12f706
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Predicate;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.io.Serializable;
+
+public interface MetricsFilter extends Predicate<IMetricsConsumer.DataPoint>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
new file mode 100644
index 0000000..d2f11d6
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.*;
+
+public class FilterByMetricNameTest {
+    @Before
+    public void setUp() throws Exception {
+
+    }
+
+    @Test
+    public void testWhitelist() {
+        List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+");
+        FilterByMetricName sut = new FilterByMetricName(whitelistPattern, null);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("storm.metric.hello", false);
+        testMetricNamesAndExpected.put("test.hello.world", false);
+        testMetricNamesAndExpected.put("test.hello.123", true);
+        testMetricNamesAndExpected.put("test.metric.world", false);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    @Test
+    public void testBlacklist() {
+        List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+        FilterByMetricName sut = new FilterByMetricName(null, blacklistPattern);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("__storm.metric.hello", false);
+        testMetricNamesAndExpected.put("storm.metric.__hello", true);
+        testMetricNamesAndExpected.put("test.hello.world", false);
+        testMetricNamesAndExpected.put("storm.test.123", false);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBothWhitelistAndBlacklistAreSpecified() {
+        List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+");
+        List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+        new FilterByMetricName(whitelistPattern, blacklistPattern);
+    }
+
+    @Test
+    public void testNoneIsSpecified() {
+        FilterByMetricName sut = new FilterByMetricName(null, null);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("__storm.metric.hello", true);
+        testMetricNamesAndExpected.put("storm.metric.__hello", true);
+        testMetricNamesAndExpected.put("test.hello.world", true);
+        testMetricNamesAndExpected.put("storm.test.123", true);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    private void assertTests(FilterByMetricName sut, Map<String, Boolean> testMetricNamesAndExpected) {
+        for (Map.Entry<String, Boolean> testEntry : testMetricNamesAndExpected.entrySet()) {
+            assertEquals("actual filter result is not same: " + testEntry.getKey(),
+                    testEntry.getValue(), sut.apply(new IMetricsConsumer.DataPoint(testEntry.getKey(), 1)));
+        }
+    }
+}
\ No newline at end of file


[2/5] storm git commit: STORM-1698 Asynchronous MetricsConsumerBolt

Posted by ka...@apache.org.
STORM-1698 Asynchronous MetricsConsumerBolt

* add new option "max.retain.metrics.tuples" to topology metrics consumer
  * if count of pending tasks exceed this value, older metric tuples are discarded


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dd6de9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dd6de9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dd6de9c

Branch: refs/heads/master
Commit: 4dd6de9c494df112d668d66d8c10288becc54495
Parents: 79be212
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue May 3 15:12:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                                |  2 ++
 .../src/jvm/org/apache/storm/daemon/StormCommon.java   |  4 +++-
 .../org/apache/storm/metric/MetricsConsumerBolt.java   | 13 ++++++++++---
 3 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 7df3e9d..0e8b354 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -41,8 +41,10 @@
 ## Metrics Consumers
 # topology.metrics.consumer.register:
 #   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #   - class: "org.mycompany.MyMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"

http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 7792052..0dbb9f2 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -383,10 +383,12 @@ public class StormCommon {
             for (Map<String, Object> info : registerInfo) {
                 String className = (String) info.get("class");
                 Object argument = info.get("argument");
+                Integer maxRetainMetricTuples = Utils.getInt(info.get("max.retain.metric.tuples"), 100);
                 Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
                 Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
-                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
+                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
+                        new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
 
                 String id = className;
                 if (classOccurrencesMap.containsKey(className)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 0aeab34..95f9137 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 public class MetricsConsumerBolt implements IBolt {
@@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt {
     String _consumerClassName;
     OutputCollector _collector;
     Object _registrationArgument;
+    private final int _maxRetainMetricTuples;
 
     private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
     private Thread _taskExecuteThread;
     private volatile boolean _running = true;
 
-    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
+    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) {
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
+        _maxRetainMetricTuples = maxRetainMetricTuples;
     }
 
     @Override
@@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt {
     
     @Override
     public void execute(Tuple input) {
+        // remove older tasks if task queue exceeds the max size
+        if (_taskQueue.size() > _maxRetainMetricTuples) {
+            while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
+                _taskQueue.poll();
+            }
+        }
+
         _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
         _collector.ack(input);
     }


[4/5] storm git commit: Merge branch 'STORM-1700'

Posted by ka...@apache.org.
Merge branch 'STORM-1700'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78f728a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78f728a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78f728a3

Branch: refs/heads/master
Commit: 78f728a3d3c33f65c54eefd90f4911a044b030cb
Parents: ded5a0d 565d53d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jun 13 13:06:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:34 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                         |  11 ++
 .../org/apache/storm/daemon/StormCommon.java    |   9 +-
 .../storm/metric/MetricsConsumerBolt.java       |  85 +++++++++++++-
 .../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++
 .../storm/metric/filter/MetricsFilter.java      |  26 +++++
 .../metric/filter/FilterByMetricNameTest.java   |  95 ++++++++++++++++
 6 files changed, 332 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[5/5] storm git commit: add STORM-1698 and STORM-1700 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1698 and STORM-1700 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d42276f6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d42276f6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d42276f6

Branch: refs/heads/master
Commit: d42276f6ffa0364d79aa9b48993d85fea06b6d28
Parents: 78f728a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jun 13 13:14:50 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:15:10 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d42276f6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b44fd1..42f35c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -105,6 +105,8 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0
+ * STORM-1700: Introduce 'whitelist' / 'blacklist' option to MetricsConsumer
+ * STORM-1698: Asynchronous MetricsConsumerBolt
  * STORM-1887: Fixed help message for set_log_level command
  * STORM-1841: Address a few minor issues in windowing and doc
  * STORM-1709: Added group by support in storm sql standalone mode
@@ -520,4 +522,4 @@
  * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-788: UI Fix key for process latencies
  * STORM-787: test-ns should announce test failures with 'BUILD FAILURE'
- * STORM-786: KafkaBolt shoul
\ No newline at end of file
+ * STORM-786: KafkaBolt shoul


[3/5] storm git commit: STORM-1698 Asynchronous MetricsConsumerBolt

Posted by ka...@apache.org.
STORM-1698 Asynchronous MetricsConsumerBolt

* change MetricsConsumerBolt's behavior to asynchronus manner
  * to avoid bad side effect of topology
* for details please refer JIRA issue: https://issues.apache.org/jira/browse/STORM-1698


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/79be212b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/79be212b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/79be212b

Branch: refs/heads/master
Commit: 79be212ba81dfbc4f0f8776a4427a986906aefa2
Parents: ded5a0d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Apr 8 13:05:00 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900

----------------------------------------------------------------------
 .../storm/metric/MetricsConsumerBolt.java       | 56 +++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/79be212b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 1f3217f..0aeab34 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -23,15 +23,28 @@ import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 public class MetricsConsumerBolt implements IBolt {
+    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
+
     IMetricsConsumer _metricsConsumer;
     String _consumerClassName;
     OutputCollector _collector;
     Object _registrationArgument;
 
+    private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
+    private Thread _taskExecuteThread;
+    private volatile boolean _running = true;
+
     public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
@@ -47,17 +60,56 @@ public class MetricsConsumerBolt implements IBolt {
         }
         _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector);
         _collector = collector;
+        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
+        _taskExecuteThread.setDaemon(true);
+        _taskExecuteThread.start();
     }
     
     @Override
     public void execute(Tuple input) {
-        _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
+        _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
         _collector.ack(input);
     }
 
     @Override
     public void cleanup() {
+        _running = false;
         _metricsConsumer.cleanup();
+        _taskExecuteThread.interrupt();
+    }
+
+    class MetricsTask {
+        private IMetricsConsumer.TaskInfo taskInfo;
+        private Collection<IMetricsConsumer.DataPoint> dataPoints;
+
+        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
+            this.taskInfo = taskInfo;
+            this.dataPoints = dataPoints;
+        }
+
+        public IMetricsConsumer.TaskInfo getTaskInfo() {
+            return taskInfo;
+        }
+
+        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
+            return dataPoints;
+        }
+    }
+
+    class MetricsHandlerRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (_running) {
+                try {
+                    MetricsTask task = _taskQueue.take();
+                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Throwable t) {
+                    LOG.error("Exception occurred during handle metrics", t);
+                }
+            }
+        }
     }
-    
 }