You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/09/15 16:57:22 UTC
[skywalking] branch master updated: Fixed bug #7706. Make
LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 92f0ebf Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)
92f0ebf is described below
commit 92f0ebf126fe242ee30f8ab8106b41f8ff39d656
Author: CoderGang <32...@qq.com>
AuthorDate: Thu Sep 16 00:56:55 2021 +0800
Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)
---
CHANGES.md | 1 +
.../kafka/provider/handler/JsonLogHandler.java | 7 +--
.../agent/kafka/provider/handler/LogHandler.java | 5 +-
.../kafka/provider/handler/LogHandlerTest.java | 63 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 5 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c80f0a0..15e11e7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@ Release Notes.
* Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function
* Support gRPC sync grouped dynamic configurations.
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
+* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
#### UI
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
index d9a88e7..d47ca59 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import java.io.IOException;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
@@ -37,15 +38,15 @@ public class JsonLogHandler extends LogHandler {
}
@Override
- public String getTopic() {
+ public String getPlainTopic() {
return config.getTopicNameOfJsonLogs();
}
-
+
@Override
protected String getDataFormat() {
return "json";
}
-
+
@Override
protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws IOException {
LogData.Builder logDataBuilder = LogData.newBuilder();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
index d7b1760..76ff9ad 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@Slf4j
-public class LogHandler implements KafkaHandler {
+public class LogHandler extends AbstractKafkaHandler {
private final KafkaFetcherConfig config;
private final HistogramMetrics histogram;
@@ -41,6 +41,7 @@ public class LogHandler implements KafkaHandler {
public LogHandler(final ModuleManager moduleManager,
final KafkaFetcherConfig config) {
+ super(moduleManager, config);
this.config = config;
this.logAnalyzerService = moduleManager.find(LogAnalyzerModule.NAME)
.provider()
@@ -69,7 +70,7 @@ public class LogHandler implements KafkaHandler {
}
@Override
- public String getTopic() {
+ protected String getPlainTopic() {
return config.getTopicNameOfLogs();
}
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java
new file mode 100644
index 0000000..6422e9f
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
+
+import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
+import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class LogHandlerTest {
+ private static final String TOPIC_NAME = "skywalking-logs";
+ private LogHandler handler = null;
+ private KafkaFetcherConfig config = new KafkaFetcherConfig();
+
+ private ModuleManager manager;
+
+ @Before
+ public void setup() {
+ final ModuleManager manager = mock(ModuleManager.class, RETURNS_DEEP_STUBS);
+ when(manager.find(LogAnalyzerModule.NAME).provider().getService(any()))
+ .thenReturn(mock(ILogAnalyzerService.class));
+ when(manager.find(TelemetryModule.NAME).provider().getService(any()))
+ .thenReturn(mock(MetricsCreator.class));
+ handler = new LogHandler(manager, config);
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(handler.getTopic(), TOPIC_NAME);
+
+ String namespace = "product";
+ config.setNamespace(namespace);
+ assertEquals(namespace + "-" + TOPIC_NAME, handler.getTopic());
+ }
+}
\ No newline at end of file