You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/23 04:02:56 UTC

[skywalking] branch metrics-system updated: Shape the prototype

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch metrics-system
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/metrics-system by this push:
     new e568dd6  Shape the prototype
e568dd6 is described below

commit e568dd61b22fb4a1fed249a9d72b9375ab3450f7
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Apr 23 12:01:58 2020 +0800

    Shape the prototype
---
 .../oap/server/core/CoreModuleProvider.java        |  8 +-
 .../server/core/analysis/meter/MeterEntity.java    | 82 ++++++++++++++++++++
 .../server/core/analysis/meter/MeterSystem.java    | 87 +++++++++++++++++-----
 .../analysis/meter/function/AcceptableValue.java   |  5 +-
 .../server/core/analysis/meter/function/Avg.java   | 10 ++-
 .../server/core/storage/model/ModelInstaller.java  |  4 +
 .../provider/PrometheusFetcherProvider.java        | 12 +--
 7 files changed, 177 insertions(+), 31 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 531a8c5..3db523f 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -155,12 +155,8 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        try {
-            MeterSystem meterSystem = new MeterSystem(getManager());
-            this.registerServiceImplementation(MeterSystem.class, meterSystem);
-        } catch (IOException e) {
-            throw new ModuleStartException(e.getMessage(), e);
-        }
+        MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
+        this.registerServiceImplementation(MeterSystem.class, meterSystem);
 
         AnnotationScan oalDisable = new AnnotationScan();
         oalDisable.registerListener(DisableRegister.INSTANCE);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
new file mode 100644
index 0000000..a006dbe
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+
+/**
+ * MeterEntity represents the entity in the meter system.
+ */
+@EqualsAndHashCode
+@ToString
+public class MeterEntity {
+    private ScopeType scopeType;
+    private String serviceName;
+    private String instanceName;
+    private String endpointName;
+
+    private MeterEntity(final ScopeType scopeType,
+                        final String serviceName,
+                        final String instanceName,
+                        final String endpointName) {
+        this.scopeType = scopeType;
+        this.serviceName = serviceName;
+        this.instanceName = instanceName;
+        this.endpointName = endpointName;
+    }
+
+    public String id() {
+        switch (scopeType) {
+            case SERVICE:
+                // In Meter system, only normal service, because we don't conjecture any node.
+                return IDManager.ServiceID.buildId(serviceName, true);
+            case SERVICE_INSTANCE:
+                return IDManager.ServiceInstanceID.buildId(
+                    IDManager.ServiceID.buildId(serviceName, true), instanceName);
+            case ENDPOINT:
+                return IDManager.EndpointID.buildId(IDManager.ServiceID.buildId(serviceName, true), endpointName);
+            default:
+                throw new UnexpectedException("Unexpected scope type of entity " + this.toString());
+        }
+    }
+
+    /**
+     * Create a service level meter entity.
+     */
+    public static MeterEntity newService(String serviceName) {
+        return new MeterEntity(ScopeType.SERVICE, serviceName, null, null);
+    }
+
+    /**
+     * Create a service instance level meter entity.
+     */
+    public static MeterEntity newServiceInstance(String serviceName, String serviceInstance) {
+        return new MeterEntity(ScopeType.SERVICE_INSTANCE, serviceName, serviceInstance, null);
+    }
+
+    /**
+     * Create an endpoint level meter entity.
+     */
+    public static MeterEntity newEndpoint(String serviceName, String endpointName) {
+        return new MeterEntity(ScopeType.ENDPOINT, serviceName, null, endpointName);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index aba1f5b..b8d6e34 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -23,7 +23,9 @@ import com.google.common.reflect.ClassPath;
 import java.io.IOException;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javassist.CannotCompileException;
 import javassist.ClassPool;
@@ -32,6 +34,7 @@ import javassist.CtConstructor;
 import javassist.CtNewConstructor;
 import javassist.CtNewMethod;
 import javassist.NotFoundException;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -53,20 +56,36 @@ import org.apache.skywalking.oap.server.library.module.Service;
 @Slf4j
 public class MeterSystem implements Service {
     private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic.";
-    private ModuleManager MANAGER;
-    private ClassPool CLASS_POOL;
-    private Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
+    private static ModuleManager MANAGER;
+    private static ClassPool CLASS_POOL;
+    private static List<NewMeter> TO_BE_CREATED_METERS = new ArrayList<>();
+    private static Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
     /**
      * Host the dynamic meter prototype classes. These classes could be create dynamically through {@link
      * Object#clone()} in the runtime;
      */
     private static Map<String, MeterDefinition> METER_PROTOTYPES = new HashMap<>();
+    private static MeterSystem METER_SYSTEM;
+    private static boolean METER_CREATABLE = true;
+
+    private MeterSystem() {
+
+    }
+
+    public synchronized static MeterSystem meterSystem(final ModuleManager manager) {
+        if (METER_SYSTEM != null) {
+            return METER_SYSTEM;
+        }
 
-    public MeterSystem(final ModuleManager manager) throws IOException {
         MANAGER = manager;
         CLASS_POOL = ClassPool.getDefault();
 
-        ClassPath classpath = ClassPath.from(MeterSystem.class.getClassLoader());
+        ClassPath classpath = null;
+        try {
+            classpath = ClassPath.from(MeterSystem.class.getClassLoader());
+        } catch (IOException e) {
+            throw new UnexpectedException("Load class path failure.");
+        }
         ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
         for (ClassPath.ClassInfo classInfo : classes) {
             Class<?> functionClass = classInfo.load();
@@ -83,10 +102,12 @@ public class MeterSystem implements Service {
                 );
             }
         }
+        METER_SYSTEM = new MeterSystem();
+        return METER_SYSTEM;
     }
 
     /**
-     * Create streaming calculable {@link AcceptableValue}. This methods is synchronized due to heavy implementation
+     * Create streaming calculation of the given metrics name. This methods is synchronized due to heavy implementation
      * including creating dynamic class. Don't use this in concurrency runtime.
      *
      * @param metricsName  The name used as the storage eneity and in the query stage.
@@ -99,10 +120,32 @@ public class MeterSystem implements Service {
                                            String functionName,
                                            ScopeType type,
                                            Class<T> dataType) throws IllegalArgumentException {
-        MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName);
-        if (meterDefinition != null) {
+        if (!METER_CREATABLE) {
+            throw new IllegalStateException("Can't create new metrics anymore");
+        }
+
+        final NewMeter newMeter = new NewMeter(metricsName, functionName, type, dataType);
+        if (TO_BE_CREATED_METERS.contains(newMeter)) {
             return false;
-        } else {
+        }
+
+        TO_BE_CREATED_METERS.add(newMeter);
+        return true;
+    }
+
+    /**
+     * Close the {@link #create(String, String, ScopeType, Class)} channel, and build the model and streaming
+     * definitions.
+     */
+    public static void closeMeterCreationChannel() {
+        METER_CREATABLE = false;
+
+        TO_BE_CREATED_METERS.forEach(newMeter -> {
+            String metricsName = newMeter.metricsName;
+            String functionName = newMeter.functionName;
+            ScopeType type = newMeter.type;
+            Class<?> dataType = newMeter.dataType;
+
             /**
              * Create a new meter class dynamically.
              */
@@ -135,7 +178,8 @@ public class MeterSystem implements Service {
             try {
                 parentClass = CLASS_POOL.get(meterFunction.getCanonicalName());
                 if (!Metrics.class.isAssignableFrom(meterFunction)) {
-                    throw new IllegalArgumentException("Function " + functionName + " doesn't inherit from Metrics.");
+                    throw new IllegalArgumentException(
+                        "Function " + functionName + " doesn't inherit from Metrics.");
                 }
             } catch (NotFoundException e) {
                 throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist.");
@@ -147,7 +191,8 @@ public class MeterSystem implements Service {
              * Create empty construct
              */
             try {
-                CtConstructor defaultConstructor = CtNewConstructor.make("public " + className + "() {}", metricsClass);
+                CtConstructor defaultConstructor = CtNewConstructor.make(
+                    "public " + className + "() {}", metricsClass);
                 metricsClass.addConstructor(defaultConstructor);
             } catch (CannotCompileException e) {
                 log.error("Can't add empty constructor in " + className + ".", e);
@@ -183,17 +228,16 @@ public class MeterSystem implements Service {
                         metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class),
                     targetClass
                 );
-                return true;
             } catch (CannotCompileException | IllegalAccessException | InstantiationException e) {
                 log.error("Can't compile/load/init " + className + ".", e);
                 throw new UnexpectedException(e.getMessage(), e);
             }
-        }
+        });
     }
 
     /**
      * Create an {@link AcceptableValue} instance for streaming calculation. AcceptableValue instance is stateful,
-     * shouldn't do {@link AcceptableValue#accept(String, Object)} once it is pushed into {@link
+     * shouldn't do {@link AcceptableValue#accept(MeterEntity, Object)} once it is pushed into {@link
      * #doStreamingCalculation(AcceptableValue)}.
      *
      * @param metricsName A defined metrics name. Use {@link #create(String, String, ScopeType, Class)} to define a new
@@ -207,7 +251,7 @@ public class MeterSystem implements Service {
         if (meterDefinition == null) {
             throw new IllegalArgumentException("Uncreated metrics " + metricsName);
         }
-        if (!meterDefinition.getScopeType().equals(dataType)) {
+        if (!meterDefinition.getDataType().equals(dataType)) {
             throw new IllegalArgumentException(
                 "Unmatched metrics data type, request for " + dataType.getName()
                     + ", but defined as " + meterDefinition.getDataType());
@@ -225,13 +269,22 @@ public class MeterSystem implements Service {
         MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
     }
 
-    private String formatName(String metricsName) {
+    private static String formatName(String metricsName) {
         return metricsName.toLowerCase();
     }
 
     @RequiredArgsConstructor
+    @EqualsAndHashCode
+    public static class NewMeter {
+        private final String metricsName;
+        private final String functionName;
+        private final ScopeType type;
+        private final Class<?> dataType;
+    }
+
+    @RequiredArgsConstructor
     @Getter
-    private class MeterDefinition {
+    private static class MeterDefinition {
         private final ScopeType scopeType;
         private final AcceptableValue meterPrototype;
         private final Class<?> dataType;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
index e45c4fa..9bd9cdd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
@@ -18,13 +18,14 @@
 
 package org.apache.skywalking.oap.server.core.analysis.meter.function;
 
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 
 /**
  * Indicate this function accepting the data of type T.
  */
 public interface AcceptableValue<T> {
-    void accept(String entityId, T value);
+    void accept(MeterEntity entity, T value);
 
     /**
      * @return a new instance based on the implementation, it should be the same class.
@@ -35,4 +36,6 @@ public interface AcceptableValue<T> {
      * @return builder
      */
     Class<? extends StorageBuilder> builder();
+
+    void setTimeBucket(long timeBucket);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
index 828d6bf..e6ed28a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
@@ -20,10 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.meter.function;
 
 import java.util.HashMap;
 import java.util.Map;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -31,6 +33,10 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 @MeterFunction(functionName = "avg")
+@EqualsAndHashCode(of = {
+    "entityId",
+    "timeBucket"
+})
 public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
     @Setter
     @Getter
@@ -89,8 +95,8 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
     }
 
     @Override
-    public void accept(final String entityId, final Long value) {
-        this.entityId = entityId;
+    public void accept(final MeterEntity entity, final Long value) {
+        this.entityId = entity.id();
         this.summation += value;
         this.count += 1;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index e890f53..b5b3ee6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -22,6 +22,7 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.RunningMode;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -41,6 +42,9 @@ public abstract class ModelInstaller {
      * Entrance of the storage entity installation work.
      */
     public final void install(Client client) throws StorageException {
+        // Can't create new module, as the model installation begins.
+        MeterSystem.closeMeterCreationChannel();
+
         IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
 
         List<Model> models = modelGetter.allModels();
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index afd6b52..9a86352 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
@@ -55,14 +57,13 @@ public class PrometheusFetcherProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-
+        final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
+        meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
     }
 
     @Override
     public void start() throws ServiceNotProvidedException, ModuleStartException {
-        final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
-        service.create(
-            "test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
+
     }
 
     @Override
@@ -73,7 +74,8 @@ public class PrometheusFetcherProvider extends ModuleProvider {
             @Override
             public void run() {
                 final AcceptableValue<Long> value = testLongMetrics.createNew();
-                value.accept("abc", 5L);
+                value.accept(MeterEntity.newService("abc"), 5L);
+                value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
                 service.doStreamingCalculation(value);
             }
         }, 2, 2, TimeUnit.SECONDS);