You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/23 04:02:56 UTC
[skywalking] branch metrics-system updated: Shape the prototype
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch metrics-system
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/metrics-system by this push:
new e568dd6 Shape the prototype
e568dd6 is described below
commit e568dd61b22fb4a1fed249a9d72b9375ab3450f7
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Apr 23 12:01:58 2020 +0800
Shape the prototype
---
.../oap/server/core/CoreModuleProvider.java | 8 +-
.../server/core/analysis/meter/MeterEntity.java | 82 ++++++++++++++++++++
.../server/core/analysis/meter/MeterSystem.java | 87 +++++++++++++++++-----
.../analysis/meter/function/AcceptableValue.java | 5 +-
.../server/core/analysis/meter/function/Avg.java | 10 ++-
.../server/core/storage/model/ModelInstaller.java | 4 +
.../provider/PrometheusFetcherProvider.java | 12 +--
7 files changed, 177 insertions(+), 31 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 531a8c5..3db523f 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -155,12 +155,8 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
- try {
- MeterSystem meterSystem = new MeterSystem(getManager());
- this.registerServiceImplementation(MeterSystem.class, meterSystem);
- } catch (IOException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
+ MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
+ this.registerServiceImplementation(MeterSystem.class, meterSystem);
AnnotationScan oalDisable = new AnnotationScan();
oalDisable.registerListener(DisableRegister.INSTANCE);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
new file mode 100644
index 0000000..a006dbe
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+
+/**
+ * MeterEntity represents the entity in the meter system.
+ */
+@EqualsAndHashCode
+@ToString
+public class MeterEntity {
+ private ScopeType scopeType;
+ private String serviceName;
+ private String instanceName;
+ private String endpointName;
+
+ private MeterEntity(final ScopeType scopeType,
+ final String serviceName,
+ final String instanceName,
+ final String endpointName) {
+ this.scopeType = scopeType;
+ this.serviceName = serviceName;
+ this.instanceName = instanceName;
+ this.endpointName = endpointName;
+ }
+
+ public String id() {
+ switch (scopeType) {
+ case SERVICE:
+ // In Meter system, only normal service, because we don't conjecture any node.
+ return IDManager.ServiceID.buildId(serviceName, true);
+ case SERVICE_INSTANCE:
+ return IDManager.ServiceInstanceID.buildId(
+ IDManager.ServiceID.buildId(serviceName, true), instanceName);
+ case ENDPOINT:
+ return IDManager.EndpointID.buildId(IDManager.ServiceID.buildId(serviceName, true), endpointName);
+ default:
+ throw new UnexpectedException("Unexpected scope type of entity " + this.toString());
+ }
+ }
+
+ /**
+ * Create a service level meter entity.
+ */
+ public static MeterEntity newService(String serviceName) {
+ return new MeterEntity(ScopeType.SERVICE, serviceName, null, null);
+ }
+
+ /**
+ * Create a service instance level meter entity.
+ */
+ public static MeterEntity newServiceInstance(String serviceName, String serviceInstance) {
+ return new MeterEntity(ScopeType.SERVICE_INSTANCE, serviceName, serviceInstance, null);
+ }
+
+ /**
+ * Create an endpoint level meter entity.
+ */
+ public static MeterEntity newEndpoint(String serviceName, String endpointName) {
+ return new MeterEntity(ScopeType.ENDPOINT, serviceName, null, endpointName);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index aba1f5b..b8d6e34 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -23,7 +23,9 @@ import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javassist.CannotCompileException;
import javassist.ClassPool;
@@ -32,6 +34,7 @@ import javassist.CtConstructor;
import javassist.CtNewConstructor;
import javassist.CtNewMethod;
import javassist.NotFoundException;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -53,20 +56,36 @@ import org.apache.skywalking.oap.server.library.module.Service;
@Slf4j
public class MeterSystem implements Service {
private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic.";
- private ModuleManager MANAGER;
- private ClassPool CLASS_POOL;
- private Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
+ private static ModuleManager MANAGER;
+ private static ClassPool CLASS_POOL;
+ private static List<NewMeter> TO_BE_CREATED_METERS = new ArrayList<>();
+ private static Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
/**
* Host the dynamic meter prototype classes. These classes could be create dynamically through {@link
* Object#clone()} in the runtime;
*/
private static Map<String, MeterDefinition> METER_PROTOTYPES = new HashMap<>();
+ private static MeterSystem METER_SYSTEM;
+ private static boolean METER_CREATABLE = true;
+
+ private MeterSystem() {
+
+ }
+
+ public synchronized static MeterSystem meterSystem(final ModuleManager manager) {
+ if (METER_SYSTEM != null) {
+ return METER_SYSTEM;
+ }
- public MeterSystem(final ModuleManager manager) throws IOException {
MANAGER = manager;
CLASS_POOL = ClassPool.getDefault();
- ClassPath classpath = ClassPath.from(MeterSystem.class.getClassLoader());
+ ClassPath classpath = null;
+ try {
+ classpath = ClassPath.from(MeterSystem.class.getClassLoader());
+ } catch (IOException e) {
+ throw new UnexpectedException("Load class path failure.");
+ }
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) {
Class<?> functionClass = classInfo.load();
@@ -83,10 +102,12 @@ public class MeterSystem implements Service {
);
}
}
+ METER_SYSTEM = new MeterSystem();
+ return METER_SYSTEM;
}
/**
- * Create streaming calculable {@link AcceptableValue}. This methods is synchronized due to heavy implementation
+ * Create streaming calculation of the given metrics name. This methods is synchronized due to heavy implementation
* including creating dynamic class. Don't use this in concurrency runtime.
*
* @param metricsName The name used as the storage eneity and in the query stage.
@@ -99,10 +120,32 @@ public class MeterSystem implements Service {
String functionName,
ScopeType type,
Class<T> dataType) throws IllegalArgumentException {
- MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName);
- if (meterDefinition != null) {
+ if (!METER_CREATABLE) {
+ throw new IllegalStateException("Can't create new metrics anymore");
+ }
+
+ final NewMeter newMeter = new NewMeter(metricsName, functionName, type, dataType);
+ if (TO_BE_CREATED_METERS.contains(newMeter)) {
return false;
- } else {
+ }
+
+ TO_BE_CREATED_METERS.add(newMeter);
+ return true;
+ }
+
+ /**
+ * Close the {@link #create(String, String, ScopeType, Class)} channel, and build the model and streaming
+ * definitions.
+ */
+ public static void closeMeterCreationChannel() {
+ METER_CREATABLE = false;
+
+ TO_BE_CREATED_METERS.forEach(newMeter -> {
+ String metricsName = newMeter.metricsName;
+ String functionName = newMeter.functionName;
+ ScopeType type = newMeter.type;
+ Class<?> dataType = newMeter.dataType;
+
/**
* Create a new meter class dynamically.
*/
@@ -135,7 +178,8 @@ public class MeterSystem implements Service {
try {
parentClass = CLASS_POOL.get(meterFunction.getCanonicalName());
if (!Metrics.class.isAssignableFrom(meterFunction)) {
- throw new IllegalArgumentException("Function " + functionName + " doesn't inherit from Metrics.");
+ throw new IllegalArgumentException(
+ "Function " + functionName + " doesn't inherit from Metrics.");
}
} catch (NotFoundException e) {
throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist.");
@@ -147,7 +191,8 @@ public class MeterSystem implements Service {
* Create empty construct
*/
try {
- CtConstructor defaultConstructor = CtNewConstructor.make("public " + className + "() {}", metricsClass);
+ CtConstructor defaultConstructor = CtNewConstructor.make(
+ "public " + className + "() {}", metricsClass);
metricsClass.addConstructor(defaultConstructor);
} catch (CannotCompileException e) {
log.error("Can't add empty constructor in " + className + ".", e);
@@ -183,17 +228,16 @@ public class MeterSystem implements Service {
metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class),
targetClass
);
- return true;
} catch (CannotCompileException | IllegalAccessException | InstantiationException e) {
log.error("Can't compile/load/init " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
- }
+ });
}
/**
* Create an {@link AcceptableValue} instance for streaming calculation. AcceptableValue instance is stateful,
- * shouldn't do {@link AcceptableValue#accept(String, Object)} once it is pushed into {@link
+ * shouldn't do {@link AcceptableValue#accept(MeterEntity, Object)} once it is pushed into {@link
* #doStreamingCalculation(AcceptableValue)}.
*
* @param metricsName A defined metrics name. Use {@link #create(String, String, ScopeType, Class)} to define a new
@@ -207,7 +251,7 @@ public class MeterSystem implements Service {
if (meterDefinition == null) {
throw new IllegalArgumentException("Uncreated metrics " + metricsName);
}
- if (!meterDefinition.getScopeType().equals(dataType)) {
+ if (!meterDefinition.getDataType().equals(dataType)) {
throw new IllegalArgumentException(
"Unmatched metrics data type, request for " + dataType.getName()
+ ", but defined as " + meterDefinition.getDataType());
@@ -225,13 +269,22 @@ public class MeterSystem implements Service {
MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
}
- private String formatName(String metricsName) {
+ private static String formatName(String metricsName) {
return metricsName.toLowerCase();
}
@RequiredArgsConstructor
+ @EqualsAndHashCode
+ public static class NewMeter {
+ private final String metricsName;
+ private final String functionName;
+ private final ScopeType type;
+ private final Class<?> dataType;
+ }
+
+ @RequiredArgsConstructor
@Getter
- private class MeterDefinition {
+ private static class MeterDefinition {
private final ScopeType scopeType;
private final AcceptableValue meterPrototype;
private final Class<?> dataType;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
index e45c4fa..9bd9cdd 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
@@ -18,13 +18,14 @@
package org.apache.skywalking.oap.server.core.analysis.meter.function;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* Indicate this function accepting the data of type T.
*/
public interface AcceptableValue<T> {
- void accept(String entityId, T value);
+ void accept(MeterEntity entity, T value);
/**
* @return a new instance based on the implementation, it should be the same class.
@@ -35,4 +36,6 @@ public interface AcceptableValue<T> {
* @return builder
*/
Class<? extends StorageBuilder> builder();
+
+ void setTimeBucket(long timeBucket);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
index 828d6bf..e6ed28a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
@@ -20,10 +20,12 @@ package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -31,6 +33,10 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MeterFunction(functionName = "avg")
+@EqualsAndHashCode(of = {
+ "entityId",
+ "timeBucket"
+})
public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
@Setter
@Getter
@@ -89,8 +95,8 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
}
@Override
- public void accept(final String entityId, final Long value) {
- this.entityId = entityId;
+ public void accept(final MeterEntity entity, final Long value) {
+ this.entityId = entity.id();
this.summation += value;
this.count += 1;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index e890f53..b5b3ee6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -22,6 +22,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -41,6 +42,9 @@ public abstract class ModelInstaller {
* Entrance of the storage entity installation work.
*/
public final void install(Client client) throws StorageException {
+ // Can't create new module, as the model installation begins.
+ MeterSystem.closeMeterCreationChannel();
+
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.allModels();
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index afd6b52..9a86352 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.fetcher.prometheus.provider;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
@@ -55,14 +57,13 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-
+ final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
+ meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
- service.create(
- "test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
+
}
@Override
@@ -73,7 +74,8 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override
public void run() {
final AcceptableValue<Long> value = testLongMetrics.createNew();
- value.accept("abc", 5L);
+ value.accept(MeterEntity.newService("abc"), 5L);
+ value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
service.doStreamingCalculation(value);
}
}, 2, 2, TimeUnit.SECONDS);