You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 07:13:46 UTC

[incubator-inlong] branch release-1.2.0 updated (c7f5ed520 -> 0a5220d79)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


    from c7f5ed520 [INLONG-4613][Release] Change the tag of Docker images to 1.2.0-incubating (#4614)
     new 10e328485 [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)
     new 0a5220d79 [INLONG-4615][Manager] PluginClassLoader adapts to the Windows system (#4617)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../agent/plugin/sources/TestKafkaReader.java      | 64 +++++++++-------------
 .../service/core/plugin/PluginClassLoader.java     | 20 ++++++-
 .../service/core/plugin/PluginClassLoaderTest.java |  3 +-
 3 files changed, 43 insertions(+), 44 deletions(-)


[incubator-inlong] 02/02: [INLONG-4615][Manager] PluginClassLoader adapts to the Windows system (#4617)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 0a5220d79342130d63152d563e702d8c80cdccea
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Jun 10 15:07:34 2022 +0800

    [INLONG-4615][Manager] PluginClassLoader adapts to the Windows system (#4617)
---
 .../service/core/plugin/PluginClassLoader.java       | 20 +++++++++++++++++---
 .../service/core/plugin/PluginClassLoaderTest.java   |  3 +--
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
index 29a56b538..34c673393 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
@@ -51,6 +51,8 @@ public class PluginClassLoader extends URLClassLoader {
 
     public static final String PLUGIN_PATH = "META-INF/plugin.yaml";
 
+    public static final String WINDOWS_PREFIX = "win";
+
     /**
      * plugin.yaml should less than 1k
      */
@@ -61,10 +63,12 @@ public class PluginClassLoader extends URLClassLoader {
      */
     private Map<String, PluginDefinition> pluginDefinitionMap = new HashMap<>();
     private ObjectMapper yamlMapper;
+    private String osName;
 
-    private PluginClassLoader(URL url, ClassLoader parent) throws IOException {
+    private PluginClassLoader(URL url, ClassLoader parent, String osName) throws IOException {
         super(new URL[]{url}, parent);
         this.pluginDirectory = new File(url.getPath());
+        this.osName = osName;
         initYamlMapper();
         loadPluginDefinition();
     }
@@ -73,13 +77,19 @@ public class PluginClassLoader extends URLClassLoader {
      * Get pluginClassLoader by plugin url.
      */
     public static PluginClassLoader getFromPluginUrl(String url, ClassLoader parent) {
+        log.info("ClassLoaderPath:{}", url);
         checkClassLoader(parent);
         checkUrl(url);
         return AccessController.doPrivileged(new PrivilegedAction<PluginClassLoader>() {
             @SneakyThrows
             @Override
             public PluginClassLoader run() {
-                return new PluginClassLoader(new URL("file://" + url), parent);
+                String os = System.getProperty("os.name").toLowerCase();
+                if (os.startsWith(WINDOWS_PREFIX)) {
+                    return new PluginClassLoader(new URL("file:///" + url), parent, os);
+                } else {
+                    return new PluginClassLoader(new URL("file://" + url), parent, os);
+                }
             }
         });
     }
@@ -139,7 +149,11 @@ public class PluginClassLoader extends URLClassLoader {
             String pluginDef = readPluginDef(pluginJar);
             pluginDef = pluginDef.replaceAll("[\\x00]+", "");
             PluginDefinition definition = yamlMapper.readValue(pluginDef, PluginDefinition.class);
-            addURL(new URL("file://" + jarFile.getAbsolutePath()));
+            if (osName.startsWith(WINDOWS_PREFIX)) {
+                addURL(new URL("file:///" + jarFile.getAbsolutePath()));
+            } else {
+                addURL(new URL("file://" + jarFile.getAbsolutePath()));
+            }
             checkPluginValid(jarFile, definition);
             definitions.add(definition);
         }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
index 7402551af..4a7535d76 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
@@ -38,8 +38,7 @@ public class PluginClassLoaderTest {
 
         String path = this.getClass().getClassLoader().getResource("").getPath();
         PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
-                Thread.currentThread()
-                        .getContextClassLoader());
+                Thread.currentThread().getContextClassLoader());
         Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
         Assert.assertEquals(1, pluginDefinitionMap.size());
         PluginDefinition pluginDefinition = Lists.newArrayList(pluginDefinitionMap.values()).get(0);


[incubator-inlong] 01/02: [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 10e328485a19e80611194521e17b63196ecff0bd
Author: zk1510 <59...@qq.com>
AuthorDate: Fri Jun 10 15:02:46 2022 +0800

    [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)
---
 .../agent/plugin/sources/TestKafkaReader.java      | 64 +++++++++-------------
 1 file changed, 25 insertions(+), 39 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
index 9e28f6586..f9f118533 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
@@ -17,51 +17,37 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import java.util.List;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class TestKafkaReader {
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaReader.class);
+public class TestKafkaReader {
 
     @Test
-    public void testKafkaReader() {
-        KafkaSource kafkaSource = new KafkaSource();
-        JobProfile conf = JobProfile.parseJsonStr("{}");
-        conf.set("job.kafkaJob.topic", "test3");
-        conf.set("job.kafkaJob.bootstrap.servers", "127.0.0.1:9092");
-        conf.set("job.kafkaJob.group.id", "test_group1");
-        conf.set("job.kafkaJob.recordspeed.limit", "1");
-        conf.set("job.kafkaJob.bytespeed.limit", "1");
-        conf.set("job.kafkaJob.partition.offset", "0#0");
-        conf.set("job.kafkaJob.autoOffsetReset", "latest");
-        conf.set("proxy.inlongGroupId", "");
-        conf.set("proxy.inlongStreamId", "");
+    public void testKafkaConsumerInit() {
+        MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final String topic = "my_topic";
 
-        try {
-            List<Reader> readers = kafkaSource.split(conf);
-            LOGGER.info("total readers by split after:{}", readers.size());
-            readers.forEach(reader -> {
-                reader.init(conf);
-                Runnable runnable = () -> {
-                    while (!reader.isFinished()) {
-                        Message msg = reader.read();
-                        if (msg != null) {
-                            LOGGER.info(new String(msg.getBody()));
-                        }
-                    }
-                    LOGGER.info("reader is finished!");
-                };
-                // start thread
-                new Thread(runnable).start();
-            });
-        } catch (Exception e) {
-            LOGGER.error("get record failed:", e);
+        mockConsumer.assign(Collections.singletonList(new TopicPartition(topic, 0)));
+        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(new TopicPartition(topic, 0), 0L);
+        mockConsumer.updateBeginningOffsets(beginningOffsets);
+
+        mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0L, "test_key", "test_value"));
+        ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(1000));
+        for (ConsumerRecord<String, String> record : records) {
+            byte[] recordValue = record.value().getBytes(StandardCharsets.UTF_8);
+            Assert.assertArrayEquals("test_value".getBytes(StandardCharsets.UTF_8), recordValue);
         }
     }
+
 }