You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/08/15 06:50:49 UTC

[skywalking] branch meter-latest created (now f6d12ab)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch meter-latest
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at f6d12ab  Add latest function

This branch includes the following new commits:

     new f6d12ab  Add latest function

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Add latest function

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch meter-latest
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit f6d12abd2284cc019f7b637af5ac844379e088f9
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Aug 15 14:48:59 2020 +0800

    Add latest function
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../analysis/meter/function/LatestFunction.java    | 180 +++++++++++++++++++++
 .../promethues/PrometheusMetricConverter.java      |  20 ++-
 .../oap/server/core/query/MetricsQueryService.java |   2 +-
 .../oap/server/core/query/sql/Function.java        |   2 +-
 .../oap/server/core/query/type/IntValues.java      |   5 +
 .../core/storage/query/IMetricsQueryDAO.java       |   5 +-
 .../meter/function/LatestFunctionTest.java         |  82 ++++++++++
 .../oap/query/graphql/resolver/MetricsQuery.java   |   2 +-
 .../src/main/resources/query-protocol              |   2 +-
 .../elasticsearch/query/MetricsQueryEsDAO.java     |  16 +-
 .../elasticsearch7/query/MetricsQueryEs7DAO.java   |  20 ++-
 .../plugin/influxdb/query/MetricsQuery.java        |  10 +-
 .../plugin/jdbc/h2/dao/H2MetricsQueryDAO.java      |  10 +-
 13 files changed, 329 insertions(+), 27 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
new file mode 100644
index 0000000..17f838e
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@MeterFunction(functionName = "latest")
+@ToString
+public abstract class LatestFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
+    protected static final String VALUE = "value";
+
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID, length = 512)
+    private String entityId;
+
+    /**
+     * Service ID is required for sort query.
+     */
+    @Setter
+    @Getter
+    @Column(columnName = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
+
+    @Getter
+    @Setter
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Latest)
+    private long value;
+
+    @Override public void accept(MeterEntity entity, Long value) {
+        this.entityId = entity.id();
+        this.serviceId = entity.serviceId();
+        this.value = value;
+    }
+
+    @Entrance public final void combine(@SourceFrom long value) {
+        this.value = value;
+    }
+
+    @Override public final void combine(Metrics metrics) {
+        LatestFunction latestFunction = (LatestFunction) metrics;
+        combine(latestFunction.value);
+    }
+
+    @Override public void calculate() {
+
+    }
+
+    @Override
+    public Metrics toHour() {
+        LatestFunction metrics = (LatestFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        LatestFunction metrics = (LatestFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        this.value = remoteData.getDataLongs(0);
+        setTimeBucket(remoteData.getDataLongs(1));
+
+        this.entityId = remoteData.getDataStrings(0);
+        this.serviceId = remoteData.getDataStrings(1);
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataLongs(value);
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+        remoteBuilder.addDataStrings(serviceId);
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public Class<? extends LastestStorageBuilder> builder() {
+        return LatestFunction.LastestStorageBuilder.class;
+    }
+
+    public static class LastestStorageBuilder implements StorageBuilder<LatestFunction> {
+        @Override
+        public LatestFunction map2Data(final Map<String, Object> dbMap) {
+            LatestFunction metrics = new LatestFunction() {
+                @Override
+                public AcceptableValue<Long> createNew() {
+                    throw new UnexpectedException("createNew should not be called");
+                }
+            };
+            metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
+            metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+            metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(final LatestFunction storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(VALUE, storageData.getValue());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+            map.put(ENTITY_ID, storageData.getEntityId());
+            return map;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof LatestFunction))
+            return false;
+        LatestFunction function = (LatestFunction) o;
+        return Objects.equals(entityId, function.entityId) &&
+            getTimeBucket() == function.getTimeBucket();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entityId, getTimeBucket());
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
index 366b802..449d525 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/metric/promethues/PrometheusMetricConverter.java
@@ -76,6 +76,8 @@ public class PrometheusMetricConverter {
 
     private final static String AVG_LABELED = "avgLabeled";
 
+    private final static String LATEST = "latest";
+
     private final Window window = new Window();
 
     private final List<MetricsRule> rules;
@@ -124,7 +126,7 @@ public class PrometheusMetricConverter {
                         }
                         return rule._3.getLabelFilter().stream()
                             .allMatch(matchRule -> matchRule.getOptions().stream()
-                                .anyMatch(metric.getLabels().get(matchRule.getKey())::matches));
+                                .anyMatch(option -> matchLabel(option, metric.getLabels().get(matchRule.getKey()))));
                     })
                     .map(rule -> Tuple.of(rule._1, rule._2, rule._3, metric))
             )
@@ -158,6 +160,7 @@ public class PrometheusMetricConverter {
                 log.debug("Building metrics {} -> {}", operation, sources);
                 Try.run(() -> {
                     switch (operation.getName()) {
+                        case LATEST:
                         case AVG:
                             sources.forEach((source, metrics) -> {
                                 AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
@@ -287,4 +290,19 @@ public class PrometheusMetricConverter {
             .onFailure(e -> log.debug(debugMessage + " failed", e))
             .toJavaStream();
     }
+
+    private boolean matchLabel(String option, String labelValue) {
+        if (option.startsWith("!")) {
+            return !matchLabelRule(option.substring(1), labelValue);
+        }
+        log.debug("{} {}", option, labelValue);
+        return matchLabelRule(option, labelValue);
+    }
+
+    private boolean matchLabelRule(String rule, String labelValue) {
+        if (Strings.isNullOrEmpty(rule)) {
+            return Strings.isNullOrEmpty(labelValue);
+        }
+        return labelValue.matches(rule);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
index f44952c..fb6b73c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetricsQueryService.java
@@ -50,7 +50,7 @@ public class MetricsQueryService implements Service {
     /**
      * Read metrics single value in the duration of required metrics
      */
-    public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+    public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
         return getMetricQueryDAO().readMetricsValue(
             condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
index aae9c58..68c9644 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/sql/Function.java
@@ -19,5 +19,5 @@
 package org.apache.skywalking.oap.server.core.query.sql;
 
 public enum Function {
-    None, Avg, Sum
+    None, Avg, Sum, Latest
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
index 53fdf91..1d51297 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/IntValues.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.query.type;
 
+import io.vavr.collection.Stream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,4 +37,8 @@ public class IntValues {
         }
         return defaultValue;
     }
+
+    public long latestValue(int defaultValue) {
+        return Stream.ofAll(values).map(KVInt::getValue).findLast(v -> v != defaultValue).getOrElse((long) defaultValue);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
index 112a1ea..548d5c1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetricsQueryDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.storage.query;
 
+import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -44,7 +45,7 @@ import static java.util.stream.Collectors.toList;
  * @since 8.0.0
  */
 public interface IMetricsQueryDAO extends DAO {
-    int readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
+    long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
 
     MetricsValues readMetricsValues(MetricsCondition condition,
                                     String valueColumnName,
@@ -95,7 +96,7 @@ public interface IMetricsQueryDAO extends DAO {
             final List<String> ids,
             final Map<String, DataTable> idMap) {
             List<String> allLabels;
-            if (Objects.isNull(labels) || labels.size() < 1) {
+            if (Objects.isNull(labels) || labels.size() < 1 || labels.stream().allMatch(Strings::isNullOrEmpty)) {
                 allLabels = idMap.values().stream()
                     .flatMap(dataTable -> dataTable.keys().stream())
                     .distinct().collect(Collectors.toList());
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
new file mode 100644
index 0000000..67c0683
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/LatestFunctionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LatestFunctionTest {
+
+    @Spy
+    private LatestFunction function;
+
+    @Test
+    public void testAccept() {
+        long time = 1597113318673L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        assertThat(function.getValue(), is(time));
+        time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        assertThat(function.getValue(), is(time));
+    }
+
+    @Test
+    public void testCalculate() {
+        long time1 = 1597113318673L;
+        long time2 = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time1);
+        function.accept(MeterEntity.newService("latest_sync_time"), time2);
+        function.calculate();
+        assertThat(function.getValue(), is(time2));
+    }
+
+    @Test
+    public void testSerialize() {
+        long time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        LatestFunction function2 = Mockito.spy(LatestFunction.class);
+        function2.deserialize(function.serialize().build());
+        assertThat(function2.getEntityId(), is(function.getEntityId()));
+        assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, InstantiationException {
+        long time = 1597113447737L;
+        function.accept(MeterEntity.newService("latest_sync_time"), time);
+        function.calculate();
+        StorageBuilder<LatestFunction> storageBuilder = function.builder().newInstance();
+
+        Map<String, Object> map = storageBuilder.data2Map(function);
+        map.put(LatestFunction.VALUE, map.get(LatestFunction.VALUE));
+
+        LatestFunction function2 = storageBuilder.map2Data(map);
+        assertThat(function2.getValue(), is(function.getValue()));
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
index af72077..3c3d40a 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetricsQuery.java
@@ -102,7 +102,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
     /**
      * Read metrics single value in the duration of required metrics
      */
-    public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
+    public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
         if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
             return 0;
         }
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index f38def1..5ecefdf 160000
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
+Subproject commit 5ecefdf2c16ca16d4973806bdd26f9eafa3faf1d
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 9e43af3..c68aaef 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -57,16 +57,20 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         buildQuery(sourceBuilder, condition, duration);
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
 
         TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
                                                                          .field(Metrics.ENTITY_ID)
                                                                          .size(1);
-        final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
         functionAggregation(function, entityIdAggregation, valueColumnName);
 
         sourceBuilder.aggregation(entityIdAggregation);
@@ -78,16 +82,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
             switch (function) {
                 case Sum:
                     Sum sum = idBucket.getAggregations().get(valueColumnName);
-                    return (int) sum.getValue();
+                    return (long) sum.getValue();
                 case Avg:
                     Avg avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
                 default:
                     avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
             }
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
index 977c3e0..d64665a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/MetricsQueryEs7DAO.java
@@ -46,16 +46,20 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
         buildQuery(sourceBuilder, condition, duration);
 
-        TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
-                                                                         .field(Metrics.ENTITY_ID)
-                                                                         .size(1);
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
+        TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
+            .field(Metrics.ENTITY_ID)
+            .size(1);
         functionAggregation(function, entityIdAggregation, valueColumnName);
 
         sourceBuilder.aggregation(entityIdAggregation);
@@ -67,16 +71,16 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
             switch (function) {
                 case Sum:
                     Sum sum = idBucket.getAggregations().get(valueColumnName);
-                    return (int) sum.getValue();
+                    return (long) sum.getValue();
                 case Avg:
                     Avg avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
                 default:
                     avg = idBucket.getAggregations().get(valueColumnName);
-                    return (int) avg.getValue();
+                    return (long) avg.getValue();
             }
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
index 5fa5832..a78b9dc 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
@@ -60,10 +60,14 @@ public class MetricsQuery implements IMetricsQueryDAO {
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 final String valueColumnName,
                                 final Duration duration) throws IOException {
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
         final String measurement = condition.getName();
 
         SelectionQueryImpl query = select();
@@ -93,11 +97,11 @@ public class MetricsQuery implements IMetricsQueryDAO {
         if (CollectionUtils.isNotEmpty(seriesList)) {
             for (QueryResult.Series series : seriesList) {
                 Number value = (Number) series.getValues().get(0).get(1);
-                return value.intValue();
+                return value.longValue();
             }
         }
 
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
index f2b0f32..97f93e7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsQueryDAO.java
@@ -49,10 +49,14 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
     }
 
     @Override
-    public int readMetricsValue(final MetricsCondition condition,
+    public long readMetricsValue(final MetricsCondition condition,
                                 String valueColumnName,
                                 final Duration duration) throws IOException {
+        int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
         final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+        if (function == Function.Latest) {
+            return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+        }
         String op;
         switch (function) {
             case Avg:
@@ -80,13 +84,13 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
                 parameters.toArray(new Object[0])
             )) {
                 while (resultSet.next()) {
-                    return resultSet.getInt("value");
+                    return resultSet.getLong("value");
                 }
             }
         } catch (SQLException e) {
             throw new IOException(e);
         }
-        return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+        return defaultValue;
     }
 
     @Override