You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/15 09:28:53 UTC
[skywalking] branch master updated: Add latest function (#5321)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 68eba15 Add latest function (#5321)
68eba15 is described below
commit 68eba154c442c9a36d4836a15a4993e5fba11404
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Aug 15 17:28:32 2020 +0800
Add latest function (#5321)
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.../analysis/meter/function/LatestFunction.java | 180 +++++++++++++++++++++
.../promethues/PrometheusMetricConverter.java | 20 ++-
.../oap/server/core/query/MetricsQueryService.java | 2 +-
.../oap/server/core/query/sql/Function.java | 2 +-
.../oap/server/core/query/type/IntValues.java | 5 +
.../core/storage/query/IMetricsQueryDAO.java | 5 +-
.../meter/function/LatestFunctionTest.java | 82 ++++++++++
.../oap/query/graphql/resolver/MetricsQuery.java | 2 +-
.../src/main/resources/query-protocol | 2 +-
.../elasticsearch/query/MetricsQueryEsDAO.java | 16 +-
.../elasticsearch7/query/MetricsQueryEs7DAO.java | 20 ++-
.../plugin/influxdb/query/MetricsQuery.java | 10 +-
.../plugin/jdbc/h2/dao/H2MetricsQueryDAO.java | 10 +-
13 files changed, 329 insertions(+), 27 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
new file mode 100644
index 0000000..17f838e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@MeterFunction(functionName = "latest")
+@ToString
+public abstract class LatestFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
+ protected static final String VALUE = "value";
+
+ @Setter
+ @Getter
+ @Column(columnName = ENTITY_ID, length = 512)
+ private String entityId;
+
+ /**
+ * Service ID is required for sort query.
+ */
+ @Setter
+ @Getter
+ @Column(columnName = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
+
+ @Getter
+ @Setter
+ @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Latest)
+ private long value;
+
+ @Override public void accept(MeterEntity entity, Long value) {
+ this.entityId = entity.id();
+ this.serviceId = entity.serviceId();
+ this.value = value;
+ }
+
+ @Entrance public final void combine(@SourceFrom long value) {
+ this.value = value;
+ }
+
+ @Override public final void combine(Metrics metrics) {
+ LatestFunction latestFunction = (LatestFunction) metrics;
+ combine(latestFunction.value);
+ }
+
+ @Override public void calculate() {
+
+ }
+
+ @Override
+ public Metrics toHour() {
+ LatestFunction metrics = (LatestFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInHour());
+ metrics.setServiceId(getServiceId());
+ metrics.setValue(getValue());
+ return metrics;
+ }
+
+ @Override
+ public Metrics toDay() {
+ LatestFunction metrics = (LatestFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInDay());
+ metrics.setServiceId(getServiceId());
+ metrics.setValue(getValue());
+ return metrics;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return entityId.hashCode();
+ }
+
+ @Override
+ public void deserialize(final RemoteData remoteData) {
+ this.value = remoteData.getDataLongs(0);
+ setTimeBucket(remoteData.getDataLongs(1));
+
+ this.entityId = remoteData.getDataStrings(0);
+ this.serviceId = remoteData.getDataStrings(1);
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.addDataLongs(value);
+ remoteBuilder.addDataLongs(getTimeBucket());
+
+ remoteBuilder.addDataStrings(entityId);
+ remoteBuilder.addDataStrings(serviceId);
+
+ return remoteBuilder;
+ }
+
+ @Override
+ public String id() {
+ return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+ }
+
+ @Override
+ public Class<? extends LastestStorageBuilder> builder() {
+ return LatestFunction.LastestStorageBuilder.class;
+ }
+
+ public static class LastestStorageBuilder implements StorageBuilder<LatestFunction> {
+ @Override
+ public LatestFunction map2Data(final Map<String, Object> dbMap) {
+ LatestFunction metrics = new LatestFunction() {
+ @Override
+ public AcceptableValue<Long> createNew() {
+ throw new UnexpectedException("createNew should not be called");
+ }
+ };
+ metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
+ metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+ metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public Map<String, Object> data2Map(final LatestFunction storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(VALUE, storageData.getValue());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+ map.put(ENTITY_ID, storageData.getEntityId());
+ return map;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof LatestFunction))
+ return false;
+ LatestFunction function = (LatestFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
index 366b802..449d525 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
@@ -76,6 +76,8 @@ public class PrometheusMetricConverter {
private final static String AVG_LABELED = "avgLabeled";
+ private final static String LATEST = "latest";
+
private final Window window = new Window();
private final List<MetricsRule> rules;
@@ -124,7 +126,7 @@ public class PrometheusMetricConverter {
}
return rule._3.getLabelFilter().stream()
.allMatch(matchRule -> matchRule.getOptions().stream()
- .anyMatch(metric.getLabels().get(matchRule.getKey())::matches));
+ .anyMatch(option -> matchLabel(option, metric.getLabels().get(matchRule.getKey()))));
})
.map(rule -> Tuple.of(rule._1, rule._2, rule._3, metric))
)
@@ -158,6 +160,7 @@ public class PrometheusMetricConverter {
log.debug("Building metrics {} -> {}", operation, sources);
Try.run(() -> {
switch (operation.getName()) {
+ case LATEST:
case AVG:
sources.forEach((source, metrics) -> {
AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
@@ -287,4 +290,19 @@ public class PrometheusMetricConverter {
.onFailure(e -> log.debug(debugMessage + " failed", e))
.toJavaStream();
}
+
+ private boolean matchLabel(String option, String labelValue) {
+ if (option.startsWith("!")) {
+ return !matchLabelRule(option.substring(1), labelValue);
+ }
+ log.debug("{} {}", option, labelValue);
+ return matchLabelRule(option, labelValue);
+ }
+
+ private boolean matchLabelRule(String rule, String labelValue) {
+ if (Strings.isNullOrEmpty(rule)) {
+ return Strings.isNullOrEmpty(labelValue);
+ }
+ return labelValue.matches(rule);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
index f44952c..fb6b73c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
@@ -50,7 +50,7 @@ public class MetricsQueryService implements Service {
/**
* Read metrics single value in the duration of required metrics
*/
- public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+ public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
return getMetricQueryDAO().readMetricsValue(
condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
index aae9c58..68c9644 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
@@ -19,5 +19,5 @@
package org.apache.skywalking.oap.server.core.query.sql;
public enum Function {
- None, Avg, Sum
+ None, Avg, Sum, Latest
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
index 53fdf91..1d51297 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.query.type;
+import io.vavr.collection.Stream;
import java.util.ArrayList;
import java.util.List;
@@ -36,4 +37,8 @@ public class IntValues {
}
return defaultValue;
}
+
+ public long latestValue(int defaultValue) {
+ return Stream.ofAll(values).map(KVInt::getValue).findLast(v -> v != defaultValue).getOrElse((long) defaultValue);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
index 112a1ea..548d5c1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.query;
+import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -44,7 +45,7 @@ import static java.util.stream.Collectors.toList;
* @since 8.0.0
*/
public interface IMetricsQueryDAO extends DAO {
- int readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
+ long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
MetricsValues readMetricsValues(MetricsCondition condition,
String valueColumnName,
@@ -95,7 +96,7 @@ public interface IMetricsQueryDAO extends DAO {
final List<String> ids,
final Map<String, DataTable> idMap) {
List<String> allLabels;
- if (Objects.isNull(labels) || labels.size() < 1) {
+ if (Objects.isNull(labels) || labels.size() < 1 || labels.stream().allMatch(Strings::isNullOrEmpty)) {
allLabels = idMap.values().stream()
.flatMap(dataTable -> dataTable.keys().stream())
.distinct().collect(Collectors.toList());
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
new file mode 100644
index 0000000..67c0683
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LatestFunctionTest {
+
+ @Spy
+ private LatestFunction function;
+
+ @Test
+ public void testAccept() {
+ long time = 1597113318673L;
+ function.accept(MeterEntity.newService("latest_sync_time"), time);
+ assertThat(function.getValue(), is(time));
+ time = 1597113447737L;
+ function.accept(MeterEntity.newService("latest_sync_time"), time);
+ assertThat(function.getValue(), is(time));
+ }
+
+ @Test
+ public void testCalculate() {
+ long time1 = 1597113318673L;
+ long time2 = 1597113447737L;
+ function.accept(MeterEntity.newService("latest_sync_time"), time1);
+ function.accept(MeterEntity.newService("latest_sync_time"), time2);
+ function.calculate();
+ assertThat(function.getValue(), is(time2));
+ }
+
+ @Test
+ public void testSerialize() {
+ long time = 1597113447737L;
+ function.accept(MeterEntity.newService("latest_sync_time"), time);
+ LatestFunction function2 = Mockito.spy(LatestFunction.class);
+ function2.deserialize(function.serialize().build());
+ assertThat(function2.getEntityId(), is(function.getEntityId()));
+ assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+ }
+
+ @Test
+ public void testBuilder() throws IllegalAccessException, InstantiationException {
+ long time = 1597113447737L;
+ function.accept(MeterEntity.newService("latest_sync_time"), time);
+ function.calculate();
+ StorageBuilder<LatestFunction> storageBuilder = function.builder().newInstance();
+
+ Map<String, Object> map = storageBuilder.data2Map(function);
+ map.put(LatestFunction.VALUE, map.get(LatestFunction.VALUE));
+
+ LatestFunction function2 = storageBuilder.map2Data(map);
+ assertThat(function2.getValue(), is(function.getValue()));
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
index af72077..3c3d40a 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
@@ -102,7 +102,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
/**
* Read metrics single value in the duration of required metrics
*/
- public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+ public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
return 0;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index f38def1..5ecefdf 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
+Subproject commit 5ecefdf2c16ca16d4973806bdd26f9eafa3faf1d
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 9e43af3..c68aaef 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -57,16 +57,20 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
}
@Override
- public int readMetricsValue(final MetricsCondition condition,
+ public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
+ int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+ if (function == Function.Latest) {
+ return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+ }
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.size(1);
- final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
@@ -78,16 +82,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
switch (function) {
case Sum:
Sum sum = idBucket.getAggregations().get(valueColumnName);
- return (int) sum.getValue();
+ return (long) sum.getValue();
case Avg:
Avg avg = idBucket.getAggregations().get(valueColumnName);
- return (int) avg.getValue();
+ return (long) avg.getValue();
default:
avg = idBucket.getAggregations().get(valueColumnName);
- return (int) avg.getValue();
+ return (long) avg.getValue();
}
}
- return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ return defaultValue;
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
index 977c3e0..d64665a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
@@ -46,16 +46,20 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
}
@Override
- public int readMetricsValue(final MetricsCondition condition,
+ public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
- TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
- .field(Metrics.ENTITY_ID)
- .size(1);
+ int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+ if (function == Function.Latest) {
+ return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+ }
+ TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
+ .field(Metrics.ENTITY_ID)
+ .size(1);
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
@@ -67,16 +71,16 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
switch (function) {
case Sum:
Sum sum = idBucket.getAggregations().get(valueColumnName);
- return (int) sum.getValue();
+ return (long) sum.getValue();
case Avg:
Avg avg = idBucket.getAggregations().get(valueColumnName);
- return (int) avg.getValue();
+ return (long) avg.getValue();
default:
avg = idBucket.getAggregations().get(valueColumnName);
- return (int) avg.getValue();
+ return (long) avg.getValue();
}
}
- return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ return defaultValue;
}
protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
index 5fa5832..a78b9dc 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
@@ -60,10 +60,14 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
@Override
- public int readMetricsValue(final MetricsCondition condition,
+ public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
+ int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+ if (function == Function.Latest) {
+ return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+ }
final String measurement = condition.getName();
SelectionQueryImpl query = select();
@@ -93,11 +97,11 @@ public class MetricsQuery implements IMetricsQueryDAO {
if (CollectionUtils.isNotEmpty(seriesList)) {
for (QueryResult.Series series : seriesList) {
Number value = (Number) series.getValues().get(0).get(1);
- return value.intValue();
+ return value.longValue();
}
}
- return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ return defaultValue;
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
index f2b0f32..97f93e7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
@@ -49,10 +49,14 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
}
@Override
- public int readMetricsValue(final MetricsCondition condition,
+ public long readMetricsValue(final MetricsCondition condition,
String valueColumnName,
final Duration duration) throws IOException {
+ int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+ if (function == Function.Latest) {
+ return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+ }
String op;
switch (function) {
case Avg:
@@ -80,13 +84,13 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
parameters.toArray(new Object[0])
)) {
while (resultSet.next()) {
- return resultSet.getInt("value");
+ return resultSet.getLong("value");
}
}
} catch (SQLException e) {
throw new IOException(e);
}
- return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ return defaultValue;
}
@Override