You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/08/13 02:44:51 UTC
[skywalking] 01/01: DataCarrier changes a `#consume` API to add
properties as a parameter to initialize consumer.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch enhance-datacarrier
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit e1b4fc16b9d84afb43ceeb7303ac8e59074267d3
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Aug 13 10:44:39 2021 +0800
DataCarrier changes a `#consume` API to add properties as a parameter to initialize consumer.
---
CHANGES.md | 5 ++++-
.../apm/commons/datacarrier/DataCarrier.java | 11 ++++++++---
.../datacarrier/consumer/ConsumeDriver.java | 22 +++++++++++++++-------
.../commons/datacarrier/consumer/IConsumer.java | 3 ++-
.../apm/commons/datacarrier/DataCarrierTest.java | 3 ++-
.../commons/datacarrier/consumer/ConsumerTest.java | 3 ++-
.../datacarrier/consumer/SampleConsumer.java | 3 ++-
.../agent/core/remote/LogReportServiceClient.java | 3 ++-
.../core/remote/TraceSegmentServiceClient.java | 3 ++-
.../core/kafka/KafkaTraceSegmentServiceClient.java | 3 ++-
.../exporter/provider/grpc/GRPCExporter.java | 3 ++-
.../exporter/provider/grpc/GRPCExporterTest.java | 2 +-
.../analysis/worker/MetricsAggregateWorker.java | 3 ++-
.../analysis/worker/MetricsPersistentWorker.java | 3 ++-
.../server/core/analysis/worker/TopNWorker.java | 3 ++-
.../core/remote/client/GRPCRemoteClient.java | 3 ++-
.../storage/plugin/jdbc/h2/dao/H2BatchDAO.java | 3 ++-
17 files changed, 54 insertions(+), 25 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e18df0f..4f21273 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,7 +6,10 @@ Release Notes.
------------------
#### Project
+
* Upgrade jdk 11 in dockerfile and remove unused java_opts.
+* DataCarrier changes a `#consume` API to add properties as a parameter to initialize consumer when
+ use `Class<? extends IConsumer<T>> consumerClass`.
#### Java Agent
@@ -31,7 +34,7 @@ Release Notes.
MacOS.
* [Break Change] Remove page path in the browser log query condition. Only support `query by page path id`.
* [Break Change] Remove endpoint name in the backend log query condition. Only support `query by endpoint id`.
-* [Break Change] Fix typo for a column `page_path_id`(was `pate_path_id`) of storage entity `browser_error_log`.
+* [Break Change] Fix typo for a column `page_path_id`(was `pate_path_id`) of storage entity `browser_error_log`.
* Add component id for Python falcon plugin.
#### UI
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 66d3ff8..4f5eabd 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.commons.datacarrier;
+import java.util.Properties;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumeDriver;
@@ -90,12 +91,16 @@ public class DataCarrier<T> {
*
* @param consumerClass class of consumer
* @param num number of consumer threads
+ * @param properties for initializing consumer.
*/
- public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
+ public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass,
+ int num,
+ long consumeCycle,
+ Properties properties) {
if (driver != null) {
driver.close(channels);
}
- driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
+ driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties);
driver.begin(channels);
return this;
}
@@ -108,7 +113,7 @@ public class DataCarrier<T> {
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
- return this.consume(consumerClass, num, 20);
+ return this.consume(consumerClass, num, 20, new Properties());
}
/**
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index 1ceb765..a6d808f 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
@@ -31,20 +32,27 @@ public class ConsumeDriver<T> implements IDriver {
private Channels<T> channels;
private ReentrantLock lock;
- public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
- long consumeCycle) {
+ public ConsumeDriver(String name,
+ Channels<T> channels, Class<? extends IConsumer<T>> consumerClass,
+ int num,
+ long consumeCycle,
+ Properties properties) {
this(channels, num);
for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
+ consumerThreads[i] = new ConsumerThread(
+ "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties),
+ consumeCycle
+ );
consumerThreads[i].setDaemon(true);
}
}
public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
- prototype.init();
+ prototype.init(new Properties());
for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
+ consumerThreads[i] = new ConsumerThread(
+ "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
@@ -57,10 +65,10 @@ public class ConsumeDriver<T> implements IDriver {
lock = new ReentrantLock();
}
- private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
+ private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) {
try {
IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance();
- inst.init();
+ inst.init(properties);
return inst;
} catch (InstantiationException e) {
throw new ConsumerCannotBeCreatedException(e);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
index 07793eb..6216b39 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
@@ -19,9 +19,10 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.List;
+import java.util.Properties;
public interface IConsumer<T> {
- void init();
+ void init(final Properties properties);
void consume(List<T> data);
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index ae496a0..5d20cce 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.commons.datacarrier;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
@@ -120,7 +121,7 @@ public class DataCarrierTest {
int i = 0;
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 6da897d..17942d5 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.SampleData;
@@ -104,7 +105,7 @@ public class ConsumerTest {
public boolean onError = false;
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java
index b23ef03..4534abd 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java
@@ -19,13 +19,14 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.List;
+import java.util.Properties;
import org.apache.skywalking.apm.commons.datacarrier.SampleData;
public class SampleConsumer implements IConsumer<SampleData> {
public int i = 1;
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
index 498af9e..ec9ef86 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.agent.core.remote;
import java.util.List;
+import java.util.Properties;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -49,7 +50,7 @@ public class LogReportServiceClient implements BootService, IConsumer<LogData> {
}
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index c9583cc..a0b8695 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
@@ -80,7 +81,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
index ac5ae33..65181fc 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.agent.core.kafka;
import java.util.List;
import java.util.Objects;
+import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
@@ -77,7 +78,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
}
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 8758584..1221b50 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -22,6 +22,7 @@ import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -121,7 +122,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
index de42ce7..d31bb0e 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java
@@ -78,7 +78,7 @@ public class GRPCExporterTest {
@Test
public void init() {
- exporter.init();
+ exporter.init(properties);
}
@Test
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 4e7c59e..268e0ad 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.List;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
@@ -117,7 +118,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private class AggregatorConsumer implements IConsumer<Metrics> {
@Override
- public void init() {
+ public void init(final Properties properties) {
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index b216bd0..dbbc451 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -321,7 +322,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
*/
private class PersistentConsumer implements IConsumer<Metrics> {
@Override
- public void init() {
+ public void init(final Properties properties) {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 1e9db5e..e224e62 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -100,7 +101,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
private class TopNConsumer implements IConsumer<TopN> {
@Override
- public void init() {
+ public void init(final Properties properties) {
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 51fda20..99049a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -23,6 +23,7 @@ import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContext;
import java.util.List;
import java.util.Objects;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@@ -153,7 +154,7 @@ public class GRPCRemoteClient implements RemoteClient {
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@Override
- public void init() {
+ public void init(final Properties properties) {
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index 3a799f9..2101e68 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
@@ -94,7 +95,7 @@ public class H2BatchDAO implements IBatchDAO {
}
@Override
- public void init() {
+ public void init(final Properties properties) {
}