You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/13 04:15:20 UTC
[1/5] storm git commit: STORM-1700 Introduce 'whitelist' /
'blacklist' option to MetricsConsumer
Repository: storm
Updated Branches:
refs/heads/master ded5a0df1 -> d42276f6f
STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer
* Users can set whitelist or blacklist to filter out metrics by name
* if none of them specified (by default), no metrics are filtered out
* how to match: substring match with regular expression
* use both ^ and $ when you want to match strictly (full string match)
* cache whether the metric name is filtered in or out
* kinds of metrics are not changing during lifecycle of the topology
* so applying regex for every metrics could be waste of CPU
* added unit test
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/565d53dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/565d53dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/565d53dc
Branch: refs/heads/master
Commit: 565d53dc655a4ffa5626b81658379cdf96a13fff
Parents: 4dd6de9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Apr 9 14:11:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900
----------------------------------------------------------------------
conf/storm.yaml.example | 9 ++
.../org/apache/storm/daemon/StormCommon.java | 7 +-
.../storm/metric/MetricsConsumerBolt.java | 38 +++++--
.../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++
.../storm/metric/filter/MetricsFilter.java | 26 +++++
.../metric/filter/FilterByMetricNameTest.java | 95 ++++++++++++++++
6 files changed, 275 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 0e8b354..c6d9544 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -39,12 +39,21 @@
# - "server2"
## Metrics Consumers
+## max.retain.metric.tuples
+## - task queue will be unbounded when max.retain.metric.tuples is equal or less than 0.
+## whitelist / blacklist
+## - when none of configuration for metric filter are specified, it'll be treated as 'pass all'.
+## - you need to specify either whitelist or blacklist, or none of them. You can't specify both of them.
+## - you can specify multiple whitelist / blacklist with regular expression
# topology.metrics.consumer.register:
# - class: "org.apache.storm.metric.LoggingMetricsConsumer"
# max.retain.metric.tuples: 100
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# max.retain.metric.tuples: 100
+# whitelist:
+# - "execute.*"
+# - "^__complete-latency$"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 0dbb9f2..5097167 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -38,6 +38,7 @@ import org.apache.storm.generated.StreamInfo;
import org.apache.storm.metric.EventLoggerBolt;
import org.apache.storm.metric.MetricsConsumerBolt;
import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.metric.filter.FilterByMetricName;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.task.IBolt;
import org.apache.storm.testing.NonRichBoltTracker;
@@ -387,8 +388,12 @@ public class StormCommon {
Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+ List<String> whitelist = (List<String>) info.get("whitelist");
+ List<String> blacklist = (List<String>) info.get("blacklist");
+ FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
- new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
+ new MetricsConsumerBolt(className, argument, maxRetainMetricTuples, filterPredicate),
+ null, phintNum, metricsConsumerConf);
String id = className;
if (classOccurrencesMap.containsKey(className)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 95f9137..16a253c 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -17,6 +17,9 @@
*/
package org.apache.storm.metric;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IBolt;
@@ -27,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@@ -39,15 +43,25 @@ public class MetricsConsumerBolt implements IBolt {
OutputCollector _collector;
Object _registrationArgument;
private final int _maxRetainMetricTuples;
+ private Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
- private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
+ private final BlockingQueue<MetricsTask> _taskQueue;
private Thread _taskExecuteThread;
private volatile boolean _running = true;
- public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) {
+ public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
+ Predicate<IMetricsConsumer.DataPoint> filterPredicate) {
+
_consumerClassName = consumerClassName;
_registrationArgument = registrationArgument;
_maxRetainMetricTuples = maxRetainMetricTuples;
+ _filterPredicate = filterPredicate;
+
+ if (_maxRetainMetricTuples > 0) {
+ _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
+ } else {
+ _taskQueue = new LinkedBlockingDeque<>();
+ }
}
@Override
@@ -67,17 +81,22 @@ public class MetricsConsumerBolt implements IBolt {
@Override
public void execute(Tuple input) {
- // remove older tasks if task queue exceeds the max size
- if (_taskQueue.size() > _maxRetainMetricTuples) {
- while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
- _taskQueue.poll();
- }
+ IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
+ Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
+ List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(dataPoints);
+ MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);
+
+ while (! _taskQueue.offer(metricsTask)) {
+ _taskQueue.poll();
}
- _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
_collector.ack(input);
}
+ private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
+ return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
+ }
+
@Override
public void cleanup() {
_running = false;
@@ -85,7 +104,7 @@ public class MetricsConsumerBolt implements IBolt {
_taskExecuteThread.interrupt();
}
- class MetricsTask {
+ static class MetricsTask {
private IMetricsConsumer.TaskInfo taskInfo;
private Collection<IMetricsConsumer.DataPoint> dataPoints;
@@ -119,4 +138,5 @@ public class MetricsConsumerBolt implements IBolt {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
new file mode 100644
index 0000000..9c95428
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class FilterByMetricName implements MetricsFilter {
+ private final Cache<String, Boolean> filterCache;
+ private final List<Pattern> whitelistPattern;
+ private final List<Pattern> blacklistPattern;
+ private boolean noneSpecified = false;
+
+ public FilterByMetricName(List<String> whitelistPattern, List<String> blacklistPattern) {
+ // guard NPE
+ if (whitelistPattern == null) {
+ this.whitelistPattern = Collections.emptyList();
+ } else {
+ this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern);
+ }
+
+ // guard NPE
+ if (blacklistPattern == null) {
+ this.blacklistPattern = Collections.emptyList();
+ } else {
+ this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern);
+ }
+
+ if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) {
+ noneSpecified = true;
+ } else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) {
+ throw new IllegalArgumentException("You have to specify either includes or excludes, or none.");
+ }
+
+ filterCache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build();
+ }
+
+ @Override
+ public boolean apply(IMetricsConsumer.DataPoint dataPoint) {
+ if (noneSpecified) {
+ return true;
+ }
+
+ String metricName = dataPoint.name;
+
+ Boolean cachedFilteredIn = filterCache.getIfPresent(metricName);
+ if (cachedFilteredIn != null) {
+ return cachedFilteredIn;
+ } else {
+ boolean filteredIn = isFilteredIn(metricName);
+ filterCache.put(metricName, filteredIn);
+ return filteredIn;
+ }
+ }
+
+ private ArrayList<Pattern> convertPatternStringsToPatternInstances(List<String> patterns) {
+ return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() {
+ @Override
+ public Pattern apply(String s) {
+ return Pattern.compile(s);
+ }
+ }));
+ }
+
+ private boolean isFilteredIn(String metricName) {
+ if (!whitelistPattern.isEmpty()) {
+ return checkMatching(metricName, whitelistPattern, true);
+ } else if (!blacklistPattern.isEmpty()) {
+ return checkMatching(metricName, blacklistPattern, false);
+ }
+
+ throw new IllegalStateException("Shouldn't reach here");
+ }
+
+ private boolean checkMatching(String metricName, List<Pattern> patterns, boolean valueWhenMatched) {
+ for (Pattern pattern : patterns) {
+ if (pattern.matcher(metricName).find()) {
+ return valueWhenMatched;
+ }
+ }
+
+ return !valueWhenMatched;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
new file mode 100644
index 0000000..f12f706
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Predicate;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.io.Serializable;
+
+public interface MetricsFilter extends Predicate<IMetricsConsumer.DataPoint>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
new file mode 100644
index 0000000..d2f11d6
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.*;
+
+public class FilterByMetricNameTest {
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @Test
+ public void testWhitelist() {
+ List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+");
+ FilterByMetricName sut = new FilterByMetricName(whitelistPattern, null);
+
+ Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+ testMetricNamesAndExpected.put("storm.metric.hello", false);
+ testMetricNamesAndExpected.put("test.hello.world", false);
+ testMetricNamesAndExpected.put("test.hello.123", true);
+ testMetricNamesAndExpected.put("test.metric.world", false);
+ testMetricNamesAndExpected.put("metric.world", true);
+
+ assertTests(sut, testMetricNamesAndExpected);
+ }
+
+ @Test
+ public void testBlacklist() {
+ List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+ FilterByMetricName sut = new FilterByMetricName(null, blacklistPattern);
+
+ Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+ testMetricNamesAndExpected.put("__storm.metric.hello", false);
+ testMetricNamesAndExpected.put("storm.metric.__hello", true);
+ testMetricNamesAndExpected.put("test.hello.world", false);
+ testMetricNamesAndExpected.put("storm.test.123", false);
+ testMetricNamesAndExpected.put("metric.world", true);
+
+ assertTests(sut, testMetricNamesAndExpected);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBothWhitelistAndBlacklistAreSpecified() {
+ List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+");
+ List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+ new FilterByMetricName(whitelistPattern, blacklistPattern);
+ }
+
+ @Test
+ public void testNoneIsSpecified() {
+ FilterByMetricName sut = new FilterByMetricName(null, null);
+
+ Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+ testMetricNamesAndExpected.put("__storm.metric.hello", true);
+ testMetricNamesAndExpected.put("storm.metric.__hello", true);
+ testMetricNamesAndExpected.put("test.hello.world", true);
+ testMetricNamesAndExpected.put("storm.test.123", true);
+ testMetricNamesAndExpected.put("metric.world", true);
+
+ assertTests(sut, testMetricNamesAndExpected);
+ }
+
+ private void assertTests(FilterByMetricName sut, Map<String, Boolean> testMetricNamesAndExpected) {
+ for (Map.Entry<String, Boolean> testEntry : testMetricNamesAndExpected.entrySet()) {
+ assertEquals("actual filter result is not same: " + testEntry.getKey(),
+ testEntry.getValue(), sut.apply(new IMetricsConsumer.DataPoint(testEntry.getKey(), 1)));
+ }
+ }
+}
\ No newline at end of file
[2/5] storm git commit: STORM-1698 Asynchronous MetricsConsumerBolt
Posted by ka...@apache.org.
STORM-1698 Asynchronous MetricsConsumerBolt
* add new option "max.retain.metrics.tuples" to topology metrics consumer
* if count of pending tasks exceed this value, older metric tuples are discarded
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dd6de9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dd6de9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dd6de9c
Branch: refs/heads/master
Commit: 4dd6de9c494df112d668d66d8c10288becc54495
Parents: 79be212
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue May 3 15:12:38 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900
----------------------------------------------------------------------
conf/storm.yaml.example | 2 ++
.../src/jvm/org/apache/storm/daemon/StormCommon.java | 4 +++-
.../org/apache/storm/metric/MetricsConsumerBolt.java | 13 ++++++++++---
3 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 7df3e9d..0e8b354 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -41,8 +41,10 @@
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "org.apache.storm.metric.LoggingMetricsConsumer"
+# max.retain.metric.tuples: 100
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
+# max.retain.metric.tuples: 100
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 7792052..0dbb9f2 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -383,10 +383,12 @@ public class StormCommon {
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get("class");
Object argument = info.get("argument");
+ Integer maxRetainMetricTuples = Utils.getInt(info.get("max.retain.metric.tuples"), 100);
Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1);
Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
- Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
+ Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
+ new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
String id = className;
if (classOccurrencesMap.containsKey(className)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 0aeab34..95f9137 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class MetricsConsumerBolt implements IBolt {
@@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt {
String _consumerClassName;
OutputCollector _collector;
Object _registrationArgument;
+ private final int _maxRetainMetricTuples;
private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
private Thread _taskExecuteThread;
private volatile boolean _running = true;
- public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
+ public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) {
_consumerClassName = consumerClassName;
_registrationArgument = registrationArgument;
+ _maxRetainMetricTuples = maxRetainMetricTuples;
}
@Override
@@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt {
@Override
public void execute(Tuple input) {
+ // remove older tasks if task queue exceeds the max size
+ if (_taskQueue.size() > _maxRetainMetricTuples) {
+ while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
+ _taskQueue.poll();
+ }
+ }
+
_taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
_collector.ack(input);
}
[4/5] storm git commit: Merge branch 'STORM-1700'
Posted by ka...@apache.org.
Merge branch 'STORM-1700'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78f728a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78f728a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78f728a3
Branch: refs/heads/master
Commit: 78f728a3d3c33f65c54eefd90f4911a044b030cb
Parents: ded5a0d 565d53d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jun 13 13:06:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:34 2016 +0900
----------------------------------------------------------------------
conf/storm.yaml.example | 11 ++
.../org/apache/storm/daemon/StormCommon.java | 9 +-
.../storm/metric/MetricsConsumerBolt.java | 85 +++++++++++++-
.../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++
.../storm/metric/filter/MetricsFilter.java | 26 +++++
.../metric/filter/FilterByMetricNameTest.java | 95 ++++++++++++++++
6 files changed, 332 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[5/5] storm git commit: add STORM-1698 and STORM-1700 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1698 and STORM-1700 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d42276f6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d42276f6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d42276f6
Branch: refs/heads/master
Commit: d42276f6ffa0364d79aa9b48993d85fea06b6d28
Parents: 78f728a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jun 13 13:14:50 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:15:10 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d42276f6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b44fd1..42f35c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -105,6 +105,8 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-1700: Introduce 'whitelist' / 'blacklist' option to MetricsConsumer
+ * STORM-1698: Asynchronous MetricsConsumerBolt
* STORM-1887: Fixed help message for set_log_level command
* STORM-1841: Address a few minor issues in windowing and doc
* STORM-1709: Added group by support in storm sql standalone mode
@@ -520,4 +522,4 @@
* STORM-789: Send more topology context to Multi-Lang components via initial handshake
* STORM-788: UI Fix key for process latencies
* STORM-787: test-ns should announce test failures with 'BUILD FAILURE'
- * STORM-786: KafkaBolt shoul
\ No newline at end of file
+ * STORM-786: KafkaBolt shoul
[3/5] storm git commit: STORM-1698 Asynchronous MetricsConsumerBolt
Posted by ka...@apache.org.
STORM-1698 Asynchronous MetricsConsumerBolt
* change MetricsConsumerBolt's behavior to asynchronus manner
* to avoid bad side effect of topology
* for details please refer JIRA issue: https://issues.apache.org/jira/browse/STORM-1698
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/79be212b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/79be212b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/79be212b
Branch: refs/heads/master
Commit: 79be212ba81dfbc4f0f8776a4427a986906aefa2
Parents: ded5a0d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Apr 8 13:05:00 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jun 13 13:06:26 2016 +0900
----------------------------------------------------------------------
.../storm/metric/MetricsConsumerBolt.java | 56 +++++++++++++++++++-
1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/79be212b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 1f3217f..0aeab34 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -23,15 +23,28 @@ import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
public class MetricsConsumerBolt implements IBolt {
+ public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
+
IMetricsConsumer _metricsConsumer;
String _consumerClassName;
OutputCollector _collector;
Object _registrationArgument;
+ private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>();
+ private Thread _taskExecuteThread;
+ private volatile boolean _running = true;
+
public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
_consumerClassName = consumerClassName;
_registrationArgument = registrationArgument;
@@ -47,17 +60,56 @@ public class MetricsConsumerBolt implements IBolt {
}
_metricsConsumer.prepare(stormConf, _registrationArgument, context, collector);
_collector = collector;
+ _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
+ _taskExecuteThread.setDaemon(true);
+ _taskExecuteThread.start();
}
@Override
public void execute(Tuple input) {
- _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
+ _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)));
_collector.ack(input);
}
@Override
public void cleanup() {
+ _running = false;
_metricsConsumer.cleanup();
+ _taskExecuteThread.interrupt();
+ }
+
+ class MetricsTask {
+ private IMetricsConsumer.TaskInfo taskInfo;
+ private Collection<IMetricsConsumer.DataPoint> dataPoints;
+
+ public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
+ this.taskInfo = taskInfo;
+ this.dataPoints = dataPoints;
+ }
+
+ public IMetricsConsumer.TaskInfo getTaskInfo() {
+ return taskInfo;
+ }
+
+ public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
+ return dataPoints;
+ }
+ }
+
+ class MetricsHandlerRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ while (_running) {
+ try {
+ MetricsTask task = _taskQueue.take();
+ _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
+ } catch (InterruptedException e) {
+ break;
+ } catch (Throwable t) {
+ LOG.error("Exception occurred during handle metrics", t);
+ }
+ }
+ }
}
-
}