You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/09/15 16:57:22 UTC

[skywalking] branch master updated: Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 92f0ebf  Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)
92f0ebf is described below

commit 92f0ebf126fe242ee30f8ab8106b41f8ff39d656
Author: CoderGang <32...@qq.com>
AuthorDate: Thu Sep 16 00:56:55 2021 +0800

    Fixed bug #7706. Make LogHandler of kafka-fetcher-plugin can recognize namespace. (#7717)
---
 CHANGES.md                                         |  1 +
 .../kafka/provider/handler/JsonLogHandler.java     |  7 +--
 .../agent/kafka/provider/handler/LogHandler.java   |  5 +-
 .../kafka/provider/handler/LogHandlerTest.java     | 63 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 5 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c80f0a0..15e11e7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@ Release Notes.
 * Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function
 * Support gRPC sync grouped dynamic configurations.
 * Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
+* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
 
 #### UI
 
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
index d9a88e7..d47ca59 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JsonLogHandler.java
@@ -18,6 +18,7 @@
 package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
 
 import java.io.IOException;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
@@ -37,15 +38,15 @@ public class JsonLogHandler extends LogHandler {
     }
 
     @Override
-    public String getTopic() {
+    public String getPlainTopic() {
         return config.getTopicNameOfJsonLogs();
     }
-    
+
     @Override
     protected String getDataFormat() {
         return "json";
     }
-    
+
     @Override
     protected LogData parseConsumerRecord(ConsumerRecord<String, Bytes> record) throws IOException {
         LogData.Builder logDataBuilder = LogData.newBuilder();
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
index d7b1760..76ff9ad 100644
--- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandler.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 @Slf4j
-public class LogHandler implements KafkaHandler {
+public class LogHandler extends AbstractKafkaHandler {
 
     private final KafkaFetcherConfig config;
     private final HistogramMetrics histogram;
@@ -41,6 +41,7 @@ public class LogHandler implements KafkaHandler {
 
     public LogHandler(final ModuleManager moduleManager,
                       final KafkaFetcherConfig config) {
+        super(moduleManager, config);
         this.config = config;
         this.logAnalyzerService = moduleManager.find(LogAnalyzerModule.NAME)
                                                .provider()
@@ -69,7 +70,7 @@ public class LogHandler implements KafkaHandler {
     }
 
     @Override
-    public String getTopic() {
+    protected String getPlainTopic() {
         return config.getTopicNameOfLogs();
     }
 
diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java
new file mode 100644
index 0000000..6422e9f
--- /dev/null
+++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/test/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/LogHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
+
+import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
+import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
+import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class LogHandlerTest {
+    private static final String TOPIC_NAME = "skywalking-logs";
+    private LogHandler handler = null;
+    private KafkaFetcherConfig config = new KafkaFetcherConfig();
+
+    private ModuleManager manager;
+
+    @Before
+    public void setup() {
+        final ModuleManager manager = mock(ModuleManager.class, RETURNS_DEEP_STUBS);
+        when(manager.find(LogAnalyzerModule.NAME).provider().getService(any()))
+            .thenReturn(mock(ILogAnalyzerService.class));
+        when(manager.find(TelemetryModule.NAME).provider().getService(any()))
+            .thenReturn(mock(MetricsCreator.class));
+        handler = new LogHandler(manager, config);
+    }
+
+    @Test
+    public void testGetTopic() {
+        assertEquals(handler.getTopic(), TOPIC_NAME);
+
+        String namespace = "product";
+        config.setNamespace(namespace);
+        assertEquals(namespace + "-" + TOPIC_NAME, handler.getTopic());
+    }
+}
\ No newline at end of file