You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/22 09:53:33 UTC

[skywalking] branch metrics-system created (now 1914d1f)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch metrics-system
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 1914d1f  Fix meter system prototype codes.

This branch includes the following new commits:

     new 1914d1f  Fix meter system prototype codes.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Fix meter system prototype codes.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch metrics-system
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 1914d1fc09b8019016c7bc2929378fed3c37893f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Apr 22 17:52:39 2020 +0800

    Fix meter system prototype codes.
---
 oap-server/oal-rt/pom.xml                          |   4 -
 .../skywalking/oal/rt/parser/MetricsHolder.java    |   9 +-
 oap-server/server-core/pom.xml                     |   4 +
 .../oap/server/core/CoreModuleProvider.java        |   7 +
 .../oap/server/core/analysis/StreamDefinition.java |  37 +++++
 .../server/core/analysis/meter/MeterFactory.java   | 180 +++++++++++++++++++++
 .../analysis/meter/function/AcceptableValue.java   |  38 +++++
 .../server/core/analysis/meter/function/Avg.java   | 113 +++++++++++++
 .../analysis/meter/function/MeterFunction.java     |  34 ++++
 .../core/analysis/metrics/LongAvgMetrics.java      |   4 +-
 .../analysis/worker/MetricsStreamProcessor.java    |  27 ++--
 .../elasticsearch/query/AggregationQueryEsDAO.java |  20 ++-
 .../query/AggregationQueryEs7DAO.java              |  21 ++-
 13 files changed, 472 insertions(+), 26 deletions(-)

diff --git a/oap-server/oal-rt/pom.xml b/oap-server/oal-rt/pom.xml
index 8f1362a..79cbcde 100755
--- a/oap-server/oal-rt/pom.xml
+++ b/oap-server/oal-rt/pom.xml
@@ -47,10 +47,6 @@
             <artifactId>freemarker</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.javassist</groupId>
-            <artifactId>javassist</artifactId>
-        </dependency>
-        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/MetricsHolder.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/MetricsHolder.java
index 61031af..05efde2 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/MetricsHolder.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/MetricsHolder.java
@@ -23,10 +23,11 @@ import com.google.common.reflect.ClassPath;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsFunction;
 
 public class MetricsHolder {
-    private static Map<String, Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>> REGISTER = new HashMap<>();
+    private static Map<String, Class<? extends Metrics>> REGISTER = new HashMap<>();
 
     public static void init() throws IOException {
         ClassPath classpath = ClassPath.from(MetricsHolder.class.getClassLoader());
@@ -38,16 +39,16 @@ public class MetricsHolder {
                 MetricsFunction metricsFunction = aClass.getAnnotation(MetricsFunction.class);
                 REGISTER.put(
                     metricsFunction.functionName(),
-                    (Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics>) aClass
+                    (Class<? extends Metrics>) aClass
                 );
             }
         }
     }
 
-    public static Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> find(
+    public static Class<? extends Metrics> find(
         String functionName) {
         String func = functionName;
-        Class<? extends org.apache.skywalking.oap.server.core.analysis.metrics.Metrics> metricsClass = REGISTER.get(
+        Class<? extends Metrics> metricsClass = REGISTER.get(
             func);
         if (metricsClass == null) {
             throw new IllegalArgumentException("Can't find metrics, " + func);
diff --git a/oap-server/server-core/pom.xml b/oap-server/server-core/pom.xml
index f13910b..5d2b172 100644
--- a/oap-server/server-core/pom.xml
+++ b/oap-server/server-core/pom.xml
@@ -74,6 +74,10 @@
             <artifactId>apm-network</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.javassist</groupId>
+            <artifactId>javassist</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.skywalking</groupId>
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index cf81132..a93ce6d 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationSe
 import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
 import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterFactory;
 import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
 import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
@@ -154,6 +155,12 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
+        try {
+            MeterFactory.init(getManager());
+        } catch (IOException e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
+
         AnnotationScan oalDisable = new AnnotationScan();
         oalDisable.registerListener(DisableRegister.INSTANCE);
         oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamDefinition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamDefinition.java
new file mode 100644
index 0000000..59cabf1
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamDefinition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+
+@RequiredArgsConstructor
+@Getter
+public class StreamDefinition {
+    private final String name;
+    private final int scopeId;
+    private final Class<? extends StorageBuilder> builder;
+    private final Class<? extends StreamProcessor> processor;
+
+    public static StreamDefinition from(Stream stream) {
+        return new StreamDefinition(stream.name(), stream.scopeId(), stream.builder(), stream.processor());
+    }
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterFactory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterFactory.java
new file mode 100644
index 0000000..098adba
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.reflect.ClassPath;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javassist.CannotCompileException;
+import javassist.ClassPool;
+import javassist.CtClass;
+import javassist.CtConstructor;
+import javassist.CtNewConstructor;
+import javassist.CtNewMethod;
+import javassist.NotFoundException;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * MeterFactory provides the API way to create {@link MetricsStreamProcessor} rather than manual analysis metrics or OAL
+ * script.
+ *
+ * @since 8.0.0
+ */
+@Slf4j
+public class MeterFactory {
+    private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic.";
+    private static ModuleManager MANAGER;
+    private static ClassPool CLASS_POOL;
+    private static Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
+    /**
+     * Host the dynamic meter prototype classes. These classes could be create dynamically through {@link
+     * Object#clone()} in the runtime;
+     */
+    private static Map<String, MeterDefinition> METER_PROTOTYPES = new HashMap<>();
+
+    public static void init(final ModuleManager manager) throws IOException {
+        MANAGER = manager;
+        CLASS_POOL = ClassPool.getDefault();
+
+        ClassPath classpath = ClassPath.from(MeterFactory.class.getClassLoader());
+        ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
+        for (ClassPath.ClassInfo classInfo : classes) {
+            Class<?> aClass = classInfo.load();
+
+            if (aClass.isAnnotationPresent(MeterFunction.class)) {
+                MeterFunction metricsFunction = aClass.getAnnotation(MeterFunction.class);
+                FUNCTION_REGISTER.put(
+                    metricsFunction.functionName(),
+                    (Class<? extends MeterFunction>) aClass
+                );
+            }
+        }
+    }
+
+    /**
+     * Create streaming calculable {@link AcceptableValue}. This methods is synchronized due to heavy implementation
+     * including creating dynamic class. Don't use this in concurrency runtime.
+     *
+     * @param metricsName  The name used as the storage eneity and in the query stage.
+     * @param functionName The function provided through {@link MeterFunction}.
+     * @return {@link AcceptableValue} to accept the value for further distributed calculation.
+     */
+    public synchronized AcceptableValue create(String metricsName, String functionName, ScopeType type) {
+        MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName);
+        if (meterDefinition != null) {
+            return meterDefinition.getMeterPrototype().createNew();
+        } else {
+            /**
+             * Create a new meter class dynamically.
+             */
+            final Class<? extends MeterFunction> meterFunction = FUNCTION_REGISTER.get(functionName);
+            if (meterFunction == null) {
+                throw new IllegalArgumentException("Function " + functionName + "can't be found.");
+            }
+            final CtClass parentClass;
+            try {
+                parentClass = CLASS_POOL.get(meterFunction.getCanonicalName());
+            } catch (NotFoundException e) {
+                throw new IllegalArgumentException("Function " + functionName + "can't be found by javaassist.");
+            }
+            final String className = formatName(metricsName);
+            CtClass metricsClass = CLASS_POOL.makeClass(className, parentClass);
+
+            /**
+             * Create empty construct
+             */
+            try {
+                CtConstructor defaultConstructor = CtNewConstructor.make("public " + className + "() {}", metricsClass);
+                metricsClass.addConstructor(defaultConstructor);
+            } catch (CannotCompileException e) {
+                log.error("Can't add empty constructor in " + className + ".", e);
+                throw new UnexpectedException(e.getMessage(), e);
+            }
+
+            /**
+             * Generate `AcceptableValue<T> createNew()` method.
+             */
+            try {
+                metricsClass.addMethod(CtNewMethod.make(
+                    ""
+                        + "public AcceptableValue<T> createNew() {"
+                        + "    return new " + className + "();"
+                        + " }"
+                    , metricsClass));
+            } catch (CannotCompileException e) {
+                log.error("Can't generate createNew method for " + className + ".", e);
+                throw new UnexpectedException(e.getMessage(), e);
+            }
+
+            Class targetClass;
+            try {
+                targetClass = metricsClass.toClass(MeterFactory.class.getClassLoader(), null);
+                AcceptableValue prototype = (AcceptableValue) targetClass.newInstance();
+                METER_PROTOTYPES.put(metricsName, new MeterDefinition(type, prototype));
+
+                log.debug("Generate metrics class, " + metricsClass.getName());
+
+                MetricsStreamProcessor.getInstance().create(
+                    MANAGER,
+                    new StreamDefinition(metricsName, type.scopeId, prototype.builder(), MetricsStreamProcessor.class),
+                    targetClass
+                );
+                return prototype;
+            } catch (CannotCompileException | IllegalAccessException | InstantiationException e) {
+                log.error("Can't compile/load/init " + className + ".", e);
+                throw new UnexpectedException(e.getMessage(), e);
+            }
+        }
+    }
+
+    private String formatName(String metricsName) {
+        return METER_CLASS_PACKAGE + metricsName.toLowerCase();
+    }
+
+    public enum ScopeType {
+        SERVICE(DefaultScopeDefine.SERVICE),
+        SERVICE_INSTANCE(DefaultScopeDefine.SERVICE_INSTANCE),
+        ENDPOINT(DefaultScopeDefine.ENDPOINT);
+
+        @Getter
+        private final int scopeId;
+
+        ScopeType(final int scopeId) {
+            this.scopeId = scopeId;
+        }
+    }
+
+    @RequiredArgsConstructor
+    @Getter
+    private class MeterDefinition {
+        private final ScopeType scopeType;
+        private final AcceptableValue meterPrototype;
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
new file mode 100644
index 0000000..e45c4fa
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+
+/**
+ * Indicate this function accepting the data of type T.
+ */
+public interface AcceptableValue<T> {
+    void accept(String entityId, T value);
+
+    /**
+     * @return a new instance based on the implementation, it should be the same class.
+     */
+    AcceptableValue<T> createNew();
+
+    /**
+     * @return builder
+     */
+    Class<? extends StorageBuilder> builder();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
new file mode 100644
index 0000000..1328934
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@MeterFunction(functionName = "avg")
+public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID)
+    private String entityId;
+
+    @Override
+    public Metrics toHour() {
+        Avg metrics = (Avg) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setSummation(getSummation());
+        metrics.setCount(getCount());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        Avg metrics = (Avg) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setSummation(getSummation());
+        metrics.setCount(getCount());
+        return metrics;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        this.count = remoteData.getDataLongs(0);
+        this.summation = remoteData.getDataLongs(1);
+        setTimeBucket(remoteData.getDataLongs(2));
+
+        this.entityId = remoteData.getDataStrings(0);
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataLongs(count);
+        remoteBuilder.addDataLongs(summation);
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public void accept(final String entityId, final Long value) {
+        this.entityId = entityId;
+        this.summation += value;
+        this.count += 1;
+    }
+
+    @Override
+    public Class<? extends StorageBuilder> builder() {
+        return AvgStorageBuilder.class;
+    }
+
+    public static class AvgStorageBuilder implements StorageBuilder<Avg> {
+
+        @Override
+        public Avg map2Data(final Map<String, Object> dbMap) {
+            return null;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(final Avg storageData) {
+            return null;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/MeterFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/MeterFunction.java
new file mode 100644
index 0000000..f1a7bb7
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/MeterFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Meter function indicate this class is used in SkyWalking meter system. The meter system accepts data from any number
+ * based metrics ecosystem, typically like Prometheus and Micrometer Application Monitoring
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MeterFunction {
+    String functionName();
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LongAvgMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LongAvgMetrics.java
index 0298401..a45d26b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LongAvgMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/LongAvgMetrics.java
@@ -37,11 +37,11 @@ public abstract class LongAvgMetrics extends Metrics implements LongValueHolder
     @Getter
     @Setter
     @Column(columnName = SUMMATION, storageOnly = true)
-    private long summation;
+    protected long summation;
     @Getter
     @Setter
     @Column(columnName = COUNT, storageOnly = true)
-    private long count;
+    protected long count;
     @Getter
     @Setter
     @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Avg)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index c6ac3b0..5452cb2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -30,6 +30,7 @@ import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
 import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
 import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
@@ -91,18 +92,24 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
      * @param stream             definition of the metrics class.
      * @param metricsClass       data type of the streaming calculation.
      */
-    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
-        if (DisableRegister.INSTANCE.include(stream.name())) {
+        this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void create(ModuleDefineHolder moduleDefineHolder,
+                       StreamDefinition stream,
+                       Class<? extends Metrics> metricsClass) {
+        if (DisableRegister.INSTANCE.include(stream.getName())) {
             return;
         }
 
         StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IMetricsDAO metricsDAO;
         try {
-            metricsDAO = storageDAO.newMetricsDao(stream.builder().newInstance());
+            metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
         } catch (InstantiationException | IllegalAccessException e) {
-            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " metrics DAO failure.", e);
+            throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
         }
 
         INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
@@ -128,25 +135,25 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
         if (supportDownSampling) {
             if (configService.shouldToHour()) {
                 Model model = modelSetter.add(
-                    metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Hour), false);
+                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
                 hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
             }
             if (configService.shouldToDay()) {
                 Model model = modelSetter.add(
-                    metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Day), false);
+                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
                 dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
             }
 
             transWorker = new MetricsTransWorker(
-                moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker);
+                moduleDefineHolder, stream.getName(), hourPersistentWorker, dayPersistentWorker);
         }
 
         Model model = modelSetter.add(
-            metricsClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Minute), false);
+            metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
         MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
             moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
 
-        String remoteReceiverWorkerName = stream.name() + "_rec";
+        String remoteReceiverWorkerName = stream.getName() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
                                                                        .provider()
                                                                        .getService(IWorkerInstanceSetter.class);
@@ -154,7 +161,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
 
         MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
-            moduleDefineHolder, remoteWorker, stream.name());
+            moduleDefineHolder, remoteWorker, stream.getName());
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
index c1d4ce3..0e79172 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java
@@ -31,7 +31,9 @@ import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@@ -50,15 +52,27 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
                                             final Duration duration,
                                             final List<KeyValue> additionalConditions) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
-                                         .lte(duration.getEndTimeBucket())
-                                         .gte(duration.getStartTimeBucket()));
+        final RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+                                                            .lte(duration.getEndTimeBucket())
+                                                            .gte(duration.getStartTimeBucket());
 
         boolean asc = false;
         if (condition.getOrder().equals(Order.ASC)) {
             asc = true;
         }
 
+        if (additionalConditions != null && additionalConditions.size() > 0) {
+            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+            additionalConditions.forEach(additionalCondition -> {
+                boolQuery.must()
+                         .add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
+            });
+            boolQuery.must().add(queryBuilder);
+            sourceBuilder.query(boolQuery);
+        } else {
+            sourceBuilder.query(queryBuilder);
+        }
+
         sourceBuilder.aggregation(
             AggregationBuilders.terms(Metrics.ENTITY_ID)
                                .field(Metrics.ENTITY_ID)
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
index f5b9e37..a82ca88 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/AggregationQueryEs7DAO.java
@@ -30,7 +30,9 @@ import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@@ -52,15 +54,28 @@ public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
                                             final Duration duration,
                                             final List<KeyValue> additionalConditions) throws IOException {
         SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
-        sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
-                                         .lte(duration.getEndTimeBucket())
-                                         .gte(duration.getStartTimeBucket()));
+
+        final RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET)
+                                                            .lte(duration.getEndTimeBucket())
+                                                            .gte(duration.getStartTimeBucket());
 
         boolean asc = false;
         if (condition.getOrder().equals(Order.ASC)) {
             asc = true;
         }
 
+        if (additionalConditions != null && additionalConditions.size() > 0) {
+            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
+            additionalConditions.forEach(additionalCondition -> {
+                boolQuery.must()
+                         .add(QueryBuilders.termsQuery(additionalCondition.getKey(), additionalCondition.getValue()));
+            });
+            boolQuery.must().add(queryBuilder);
+            sourceBuilder.query(boolQuery);
+        } else {
+            sourceBuilder.query(queryBuilder);
+        }
+
         sourceBuilder.aggregation(
             AggregationBuilders.terms(Metrics.ENTITY_ID)
                                .field(Metrics.ENTITY_ID)