You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/07/27 23:07:40 UTC
[skywalking] 01/01: Add AvgLabeledFunction to ingest multiple labels
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch meter-label
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4e1008ee2884711fdf2e75038ada9c87e73bd051
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jul 28 06:55:50 2020 +0800
Add AvgLabeledFunction to ingest multiple labels
---
.../meter/function/AvgLabeledFunction.java | 215 +++++++++++++++++++++
.../promethues/PrometheusMetricConverter.java | 38 +++-
.../metric/promethues/operation/MetricSource.java | 3 +
.../metric/promethues/rule/PrometheusMetric.java | 1 +
.../server/core/metric/promethues/rule/Rules.java | 1 +
.../meter/function/AvgLabeledFunctionTest.java | 112 +++++++++++
.../provider/PrometheusFetcherProvider.java | 3 +
7 files changed, 369 insertions(+), 4 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunction.java
new file mode 100644
index 0000000..a8b9ad1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunction.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@MeterFunction(functionName = "avgLabeled")
+@ToString
+public abstract class AvgLabeledFunction extends Metrics implements AcceptableValue<DataTable> {
+ protected static final String SUMMATION = "summation";
+ protected static final String COUNT = "count";
+ protected static final String VALUE = "value";
+
+ @Setter
+ @Getter
+ @Column(columnName = ENTITY_ID, length = 512)
+ private String entityId;
+
+ /**
+ * Service ID is required for sort query.
+ */
+ @Setter
+ @Getter
+ @Column(columnName = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
+
+ @Getter
+ @Setter
+ @Column(columnName = SUMMATION, storageOnly = true)
+ protected DataTable summation = new DataTable(30);
+ @Getter
+ @Setter
+ @Column(columnName = COUNT, storageOnly = true)
+ protected DataTable count = new DataTable(30);
+ @Getter
+ @Setter
+ @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+ private DataTable value = new DataTable(30);
+
+ @Override
+ public final void combine(Metrics metrics) {
+ AvgLabeledFunction longAvgMetrics = (AvgLabeledFunction) metrics;
+ summation.append(longAvgMetrics.summation);
+ count.append(longAvgMetrics.count);
+ }
+
+ @Override
+ public final void calculate() {
+ List<String> keys = count.sortedKeys(Comparator.naturalOrder());
+ for (String key : keys) {
+ Long s = summation.get(key);
+ if (Objects.isNull(s)) {
+ continue;
+ }
+ Long c = count.get(key);
+ if (Objects.isNull(c)) {
+ continue;
+ }
+ long result = s / c;
+ if (result == 0 && s > 0) {
+ result = 1;
+ }
+ value.put(key, result);
+ }
+ }
+
+ @Override
+ public Metrics toHour() {
+ AvgLabeledFunction metrics = (AvgLabeledFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInHour());
+ metrics.setServiceId(getServiceId());
+ metrics.setSummation(getSummation());
+ metrics.setCount(getCount());
+ return metrics;
+ }
+
+ @Override
+ public Metrics toDay() {
+ AvgLabeledFunction metrics = (AvgLabeledFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInDay());
+ metrics.setServiceId(getServiceId());
+ metrics.setSummation(getSummation());
+ metrics.setCount(getCount());
+ return metrics;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return entityId.hashCode();
+ }
+
+ @Override
+ public void deserialize(final RemoteData remoteData) {
+ this.setCount(new DataTable(remoteData.getDataObjectStrings(0)));
+ this.setSummation(new DataTable(remoteData.getDataObjectStrings(1)));
+ setTimeBucket(remoteData.getDataLongs(0));
+
+ this.entityId = remoteData.getDataStrings(0);
+ this.serviceId = remoteData.getDataStrings(1);
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.addDataObjectStrings(count.toStorageData());
+ remoteBuilder.addDataObjectStrings(summation.toStorageData());
+ remoteBuilder.addDataLongs(getTimeBucket());
+
+ remoteBuilder.addDataStrings(entityId);
+ remoteBuilder.addDataStrings(serviceId);
+
+ return remoteBuilder;
+ }
+
+ @Override
+ public String id() {
+ return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+ }
+
+ @Override
+ public void accept(final MeterEntity entity, final DataTable value) {
+ this.entityId = entity.id();
+ this.serviceId = entity.serviceId();
+ this.summation.append(value);
+ DataTable c = new DataTable();
+ value.sortedKeys(Comparator.naturalOrder()).forEach(key -> c.put(key, 1L));
+ this.count.append(c);
+ }
+
+ @Override
+ public Class<? extends AvgLabeledStorageBuilder> builder() {
+ return AvgLabeledStorageBuilder.class;
+ }
+
+ public static class AvgLabeledStorageBuilder implements StorageBuilder<AvgLabeledFunction> {
+ @Override
+ public AvgLabeledFunction map2Data(final Map<String, Object> dbMap) {
+ AvgLabeledFunction metrics = new AvgLabeledFunction() {
+ @Override
+ public AcceptableValue<DataTable> createNew() {
+ throw new UnexpectedException("createNew should not be called");
+ }
+ };
+ metrics.setSummation(new DataTable((String) dbMap.get(SUMMATION)));
+ metrics.setValue(new DataTable((String) dbMap.get(VALUE)));
+ metrics.setCount(new DataTable((String) dbMap.get(COUNT)));
+ metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+ metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public Map<String, Object> data2Map(final AvgLabeledFunction storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(SUMMATION, storageData.getSummation());
+ map.put(VALUE, storageData.getValue());
+ map.put(COUNT, storageData.getCount());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+ map.put(ENTITY_ID, storageData.getEntityId());
+ return map;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof AvgLabeledFunction))
+ return false;
+ AvgLabeledFunction function = (AvgLabeledFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
index 8e50f50..c564c1a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.metric.promethues;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.vavr.Function1;
@@ -25,6 +26,7 @@ import io.vavr.Tuple;
import io.vavr.Tuple3;
import io.vavr.control.Try;
import java.math.BigDecimal;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -45,6 +47,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.metric.promethues.counter.Window;
import org.apache.skywalking.oap.server.core.metric.promethues.operation.MetricSource;
@@ -71,6 +74,8 @@ public class PrometheusMetricConverter {
private final static String AVG = "avg";
+ private final static String AVG_LABELED = "avgLabeled";
+
private final Window window = new Window();
private final List<MetricsRule> rules;
@@ -132,6 +137,7 @@ public class PrometheusMetricConverter {
.timestamp(tuple._4.getTimestamp())
.scale(tuple._3.getScale())
.counterFunction(tuple._3.getCounterFunction())
+ .groupBy(tuple._3.getGroupBy())
.range(tuple._3.getRange());
switch (tuple._1.getScope()) {
case SERVICE:
@@ -155,10 +161,23 @@ public class PrometheusMetricConverter {
case AVG:
sources.forEach((source, metrics) -> {
AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
- Double sumDouble = sum(metrics).value();
- sumDouble = window.get(source.getPromMetricName()).apply(source, sumDouble);
- value.accept(source.getEntity(), BigDecimal.valueOf(Double.isNaN(sumDouble) ? 0D : sumDouble)
- .multiply(BigDecimal.TEN.pow(source.getScale())).longValue());
+ value.accept(source.getEntity(), sum(metrics, source));
+ value.setTimeBucket(TimeBucket.getMinuteTimeBucket(source.getTimestamp()));
+ log.debug("Input metric {}", value.getTimeBucket());
+ service.doStreamingCalculation(value);
+
+ generateTraffic(source.getEntity());
+ });
+ break;
+ case AVG_LABELED:
+ sources.forEach((source, metrics) -> {
+ Preconditions.checkArgument(Objects.nonNull(source.getGroupBy()));
+ DataTable dt = new DataTable();
+ metrics.stream()
+ .collect(groupingBy(m -> source.getGroupBy().stream().map(m.getLabels()::get).collect(Collectors.joining("-"))))
+ .forEach((group, mm) -> dt.put(group, sum(mm, source, ImmutableMap.of("group", group))));
+ AcceptableValue<DataTable> value = service.buildMetrics(formatMetricName(operation.getMetricName()), DataTable.class);
+ value.accept(source.getEntity(), dt);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(source.getTimestamp()));
log.debug("Input metric {}", value.getTimeBucket());
service.doStreamingCalculation(value);
@@ -228,6 +247,17 @@ public class PrometheusMetricConverter {
return metrics.stream().reduce(Metric::sum).orElseThrow(IllegalArgumentException::new);
}
+ private long sum(List<Metric> metrics, MetricSource source) {
+ return sum(metrics, source, Collections.emptyMap());
+ }
+
+ private long sum(List<Metric> metrics, MetricSource source, Map<String, String> labels) {
+ Double sumDouble = sum(metrics).value();
+ sumDouble = window.get(source.getPromMetricName(), labels).apply(source, sumDouble);
+ return BigDecimal.valueOf(Double.isNaN(sumDouble) ? 0D : sumDouble)
+ .multiply(BigDecimal.TEN.pow(source.getScale())).longValue();
+ }
+
private void generateTraffic(MeterEntity entity) {
ServiceTraffic s = new ServiceTraffic();
s.setName(requireNonNull(entity.getServiceName()));
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/operation/MetricSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/operation/MetricSource.java
index ed503a2..983077b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/operation/MetricSource.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/operation/MetricSource.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.metric.promethues.operation;
+import java.util.List;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -41,4 +42,6 @@ public class MetricSource {
private final String range;
private final int scale;
+
+ private final List<String> groupBy;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/PrometheusMetric.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/PrometheusMetric.java
index f02baee..4c5cdd0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/PrometheusMetric.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/PrometheusMetric.java
@@ -28,6 +28,7 @@ public class PrometheusMetric {
private CounterFunction counterFunction;
private String range;
private List<LabelMatchRule> labelFilter;
+ private List<String> groupBy;
private Relabel relabel;
private int scale = 0;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/Rules.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/Rules.java
index 6e39e72..91537c4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/Rules.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/rule/Rules.java
@@ -44,6 +44,7 @@ public class Rules {
throw new ModuleStartException("Load fetcher rules failed", e);
}
return Arrays.stream(rules)
+ .filter(File::isFile)
.map(f -> {
try (Reader r = new FileReader(f)) {
Rule rule = new Yaml().loadAs(r, Rule.class);
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunctionTest.java
new file mode 100644
index 0000000..df1fbe0
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgLabeledFunctionTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import io.vavr.collection.Stream;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.COUNT;
+import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.SUMMATION;
+import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.VALUE;
+import static org.hamcrest.core.Is.is;
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AvgLabeledFunctionTest {
+ @Spy
+ private AvgLabeledFunction function;
+
+ @Test
+ public void testAccept() {
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
+ assertResult(asList("200", "404"), asList(10L, 2L), asList(1L, 1L));
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "500"), asList(2L, 3L)));
+ assertResult(asList("200", "404", "500"), asList(12L, 2L, 3L), asList(2L, 1L, 1L));
+ }
+
+ @Test
+ public void testCalculate() {
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "500"), asList(2L, 3L)));
+ function.calculate();
+
+ assertThat(function.getValue().sortedKeys(Comparator.naturalOrder()), is(asList("200", "404", "500")));
+ assertThat(function.getValue().sortedValues(Comparator.naturalOrder()), is(asList(6L, 2L, 3L)));
+ }
+
+ @Test
+ public void testSerialize() {
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
+ AvgLabeledFunction function2 = Mockito.spy(AvgLabeledFunction.class);
+ function2.deserialize(function.serialize().build());
+ assertThat(function2.getEntityId(), is(function.getEntityId()));
+ assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+ }
+
+ @Test
+ public void testBuilder() throws IllegalAccessException, InstantiationException {
+ function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
+ function.calculate();
+ StorageBuilder<AvgLabeledFunction> storageBuilder = function.builder().newInstance();
+
+ Map<String, Object> map = storageBuilder.data2Map(function);
+ map.put(SUMMATION, ((DataTable) map.get(SUMMATION)).toStorageData());
+ map.put(COUNT, ((DataTable) map.get(COUNT)).toStorageData());
+ map.put(VALUE, ((DataTable) map.get(VALUE)).toStorageData());
+
+ AvgLabeledFunction function2 = storageBuilder.map2Data(map);
+ assertThat(function2.getValue(), is(function.getValue()));
+ }
+
+ private DataTable build(List<String> keys, List<Long> values) {
+ DataTable result = new DataTable();
+ Stream.ofAll(keys).forEachWithIndex((key, i) -> result.put(key, values.get(i)));
+ return result;
+ }
+
+ private void assertResult(List<String> expectedKeys, List<Long> expectedValues, List<Long> expectedCount) {
+ assertSummation(expectedKeys, expectedValues);
+ assertCount(expectedKeys, expectedCount);
+ }
+
+ private void assertCount(List<String> expectedKeys, List<Long> expectedCount) {
+ List<String> keys = function.getCount().sortedKeys(Comparator.comparingInt(Integer::parseInt));
+ assertThat(keys, is(expectedKeys));
+ List<Long> values = function.getCount().sortedValues(Comparator.comparingLong(Long::parseLong));
+ assertThat(values, is(expectedCount));
+ }
+
+ private void assertSummation(List<String> expectedKeys, List<Long> expectedValues) {
+ List<String> keys = function.getSummation().sortedKeys(Comparator.comparingInt(Integer::parseInt));
+ assertThat(keys, is(expectedKeys));
+ List<Long> values = function.getSummation().sortedValues(Comparator.comparingLong(Long::parseLong));
+ assertThat(values, is(expectedValues));
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 1c3178d..ffd3c76 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -127,6 +127,9 @@ public class PrometheusFetcherProvider extends ModuleProvider {
while ((mf = p.parse(now)) != null) {
result.addAll(mf.getMetrics().stream()
.peek(metric -> {
+ if (Objects.isNull(sc.getLabels())) {
+ return;
+ }
Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
extraLabels.put("instance", url);
extraLabels.forEach((key, value) -> {