You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/15 09:28:53 UTC

[skywalking] branch master updated: Add latest function (#5321)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 68eba15  Add latest function (#5321)
68eba15 is described below

commit 68eba154c442c9a36d4836a15a4993e5fba11404
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Aug 15 17:28:32 2020 +0800

    Add latest function (#5321)
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../analysis/meter/function/LatestFunction.java    | 180 +++++++++++++++++++++
 .../promethues/PrometheusMetricConverter.java      |  20 ++-
 .../oap/server/core/query/MetricsQueryService.java |   2 +-
 .../oap/server/core/query/sql/Function.java        |   2 +-
 .../oap/server/core/query/type/IntValues.java      |   5 +
 .../core/storage/query/IMetricsQueryDAO.java       |   5 +-
 .../meter/function/LatestFunctionTest.java         |  82 ++++++++++
 .../oap/query/graphql/resolver/MetricsQuery.java   |   2 +-
 .../src/main/resources/query-protocol              |   2 +-
 .../elasticsearch/query/MetricsQueryEsDAO.java     |  16 +-
 .../elasticsearch7/query/MetricsQueryEs7DAO.java   |  20 ++-
 .../plugin/influxdb/query/MetricsQuery.java        |  10 +-
 .../plugin/jdbc/h2/dao/H2MetricsQueryDAO.java      |  10 +-
 13 files changed, 329 insertions(+), 27 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
new file mode 100644
index 0000000..17f838e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@MeterFunction(functionName = "latest")
+@ToString
+public abstract class LatestFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
+    protected static final String VALUE = "value";
+
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID, length = 512)
+    private String entityId;
+
+    /**
+     * Service ID is required for sort query.
+     */
+    @Setter
+    @Getter
+    @Column(columnName = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
+
+    @Getter
+    @Setter
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Latest)
+    private long value;
+
+    @Override public void accept(MeterEntity entity, Long value) {
+        this.entityId = entity.id();
+        this.serviceId = entity.serviceId();
+        this.value = value;
+    }
+
+    @Entrance public final void combine(@SourceFrom long value) {
+        this.value = value;
+    }
+
+    @Override public final void combine(Metrics metrics) {
+        LatestFunction latestFunction = (LatestFunction) metrics;
+        combine(latestFunction.value);
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override
+    public Metrics toHour() {
+        LatestFunction metrics = (LatestFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        LatestFunction metrics = (LatestFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        this.value = remoteData.getDataLongs(0);
+        setTimeBucket(remoteData.getDataLongs(1));
+
+        this.entityId = remoteData.getDataStrings(0);
+        this.serviceId = remoteData.getDataStrings(1);
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataLongs(value);
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+        remoteBuilder.addDataStrings(serviceId);
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public Class<? extends LastestStorageBuilder> builder() {
+        return LatestFunction.LastestStorageBuilder.class;
+    }
+
+    public static class LastestStorageBuilder implements StorageBuilder<LatestFunction> {
+        @Override
+        public LatestFunction map2Data(final Map<String, Object> dbMap) {
+            LatestFunction metrics = new LatestFunction() {
+                @Override
+                public AcceptableValue<Long> createNew() {
+                    throw new UnexpectedException("createNew should not be called");
+                }
+            };
+            metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
+            metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+            metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(final LatestFunction storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(VALUE, storageData.getValue());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+            map.put(ENTITY_ID, storageData.getEntityId());
+            return map;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof LatestFunction))
+            return false;
+        LatestFunction function = (LatestFunction) o;
+        return Objects.equals(entityId, function.entityId) &&
+            getTimeBucket() == function.getTimeBucket();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entityId, getTimeBucket());
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
index 366b802..449d525 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
@@ -76,6 +76,8 @@ public class PrometheusMetricConverter {
 
     private final static String AVG_LABELED = "avgLabeled";
 
+    private final static String LATEST = "latest";
+
     private final Window window = new Window();
 
     private final List<MetricsRule> rules;
@@ -124,7 +126,7 @@ public class PrometheusMetricConverter {
                         }
                         return rule._3.getLabelFilter().stream()
                             .allMatch(matchRule -> matchRule.getOptions().stream()
-                                .anyMatch(metric.getLabels().get(matchRule.getKey())::matches));
+                                .anyMatch(option -> matchLabel(option, metric.getLabels().get(matchRule.getKey()))));
                     })
                     .map(rule -> Tuple.of(rule._1, rule._2, rule._3, metric))
             )
@@ -158,6 +160,7 @@ public class PrometheusMetricConverter {
                 log.debug("Building metrics {} -> {}", operation, sources);
                 Try.run(() -> {
                     switch (operation.getName()) {
+                        case LATEST:
                         case AVG:
                             sources.forEach((source, metrics) -> {
                                 AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
@@ -287,4 +290,19 @@ public class PrometheusMetricConverter {
             .onFailure(e -> log.debug(debugMessage + " failed", e))
             .toJavaStream();
     }
+
+    private boolean matchLabel(String option, String labelValue) {
+        if (option.startsWith("!")) {
+            return !matchLabelRule(option.substring(1), labelValue);
+        }
+        log.debug("{} {}", option, labelValue);
+        return matchLabelRule(option, labelValue);
+    }
+
+    private boolean matchLabelRule(String rule, String labelValue) {
+        if (Strings.isNullOrEmpty(rule)) {
+            return Strings.isNullOrEmpty(labelValue);
+        }
+        return labelValue.matches(rule);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
index f44952c..fb6b73c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
@@ -50,7 +50,7 @@ public class MetricsQueryService implements Service {
     /**
      * Read metrics single value in the duration of required metrics
      */
-    public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+    public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
         return getMetricQueryDAO().readMetricsValue(
             condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
index aae9c58..68c9644 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
@@ -19,5 +19,5 @@
 package org.apache.skywalking.oap.server.core.query.sql;
 
 public enum Function {
-    None, Avg, Sum
+    None, Avg, Sum, Latest
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
index 53fdf91..1d51297 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.query.type;
 
+import io.vavr.collection.Stream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,4 +37,8 @@ public class IntValues {
         }
         return defaultValue;
     }
+
+    public long latestValue(int defaultValue) {
+        return Stream.ofAll(values).map(KVInt::getValue).findLast(v -> v != defaultValue).getOrElse((long) defaultValue);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
index 112a1ea..548d5c1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.storage.query;
 
+import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -44,7 +45,7 @@ import static java.util.stream.Collectors.toList;
  * @since 8.0.0
  */
 public interface IMetricsQueryDAO extends DAO {
-    int readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
+    long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
 
     MetricsValues readMetricsValues(MetricsCondition condition,
                                     String valueColumnName,
@@ -95,7 +96,7 @@ public interface IMetricsQueryDAO extends DAO {
             final List<String> ids,
             final Map<String, DataTable> idMap) {
             List<String> allLabels;
-            if (Objects.isNull(labels) || labels.size() < 1) {
+            if (Objects.isNull(labels) || labels.size() < 1 || labels.stream().allMatch(Strings::isNullOrEmpty)) {
                 allLabels = idMap.values().stream()
                     .flatMap(dataTable -> dataTable.keys().stream())
                     .distinct().collect(Collectors.toList());
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
new file mode 100644
index 0000000..67c0683
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LatestFunctionTest {
+
+    @Spy
+    private LatestFunction function;
+
+    @Test
+    public void testAccept() {
+        long time = 1597113318673L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        assertThat(function.getValue(), is(time));
+        time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        assertThat(function.getValue(), is(time));
+    }
+
+    @Test
+    public void testCalculate() {
+        long time1 = 1597113318673L;
+        long time2 = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time1);
+        function.accept(MeterEntity.newService("latest_sync_time"), time2);
+        function.calculate();
+        assertThat(function.getValue(), is(time2));
+    }
+
+    @Test
+    public void testSerialize() {
+        long time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        LatestFunction function2 = Mockito.spy(LatestFunction.class);
+        function2.deserialize(function.serialize().build());
+        assertThat(function2.getEntityId(), is(function.getEntityId()));
+        assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, InstantiationException {
+        long time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        function.calculate();
+        StorageBuilder<LatestFunction> storageBuilder = function.builder().newInstance();
+
+        Map<String, Object> map = storageBuilder.data2Map(function);
+        map.put(LatestFunction.VALUE, map.get(LatestFunction.VALUE));
+
+        LatestFunction function2 = storageBuilder.map2Data(map);
+        assertThat(function2.getValue(), is(function.getValue()));
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
index af72077..3c3d40a 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
@@ -102,7 +102,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
     /**
      * Read metrics single value in the duration of required metrics
      */
-    public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+    public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
         if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
             return 0;
         }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index f38def1..5ecefdf 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
+Subproject commit 5ecefdf2c16ca16d4973806bdd26f9eafa3faf1d
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 9e43af3..c68aaef 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -57,16 +57,20 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         buildQuery(sourceBuilder, condition, duration);
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
 
         TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
                                                                          .field(Metrics.ENTITY_ID)
                                                                          .size(1);
-        final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
         functionAggregation(function, entityIdAggregation, valueColumnName);
 
         sourceBuilder.aggregation(entityIdAggregation);
@@ -78,16 +82,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
             switch (function) {
                 case Sum:
                     Sum sum = idBucket.getAggregations().get(valueColumnName);
-                    return (int) sum.getValue();
+                    return (long) sum.getValue();
                 case Avg:
                     Avg avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
                 default:
                     avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
             }
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
index 977c3e0..d64665a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
@@ -46,16 +46,20 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         buildQuery(sourceBuilder, condition, duration);
 
-        TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
-                                                                         .field(Metrics.ENTITY_ID)
-                                                                         .size(1);
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
+        TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
+            .field(Metrics.ENTITY_ID)
+            .size(1);
         functionAggregation(function, entityIdAggregation, valueColumnName);
 
         sourceBuilder.aggregation(entityIdAggregation);
@@ -67,16 +71,16 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
             switch (function) {
                 case Sum:
                     Sum sum = idBucket.getAggregations().get(valueColumnName);
-                    return (int) sum.getValue();
+                    return (long) sum.getValue();
                 case Avg:
                     Avg avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
                 default:
                     avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
             }
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
index 5fa5832..a78b9dc 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
@@ -60,10 +60,14 @@ public class MetricsQuery implements IMetricsQueryDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
         final String measurement = condition.getName();
 
         SelectionQueryImpl query = select();
@@ -93,11 +97,11 @@ public class MetricsQuery implements IMetricsQueryDAO {
         if (CollectionUtils.isNotEmpty(seriesList)) {
             for (QueryResult.Series series : seriesList) {
                 Number value = (Number) series.getValues().get(0).get(1);
-                return value.intValue();
+                return value.longValue();
             }
         }
 
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
index f2b0f32..97f93e7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
@@ -49,10 +49,14 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 String valueColumnName,
                                 final Duration duration) throws IOException {
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
         String op;
         switch (function) {
             case Avg:
@@ -80,13 +84,13 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
                 parameters.toArray(new Object[0])
             )) {
                 while (resultSet.next()) {
-                    return resultSet.getInt("value");
+                    return resultSet.getLong("value");
                 }
             }
         } catch (SQLException e) {
             throw new IOException(e);
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override