You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/06/01 11:42:16 UTC

[incubator-seatunnel] branch dev updated: [Core] [Fix] add plugin mapping ignore case (#1979)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 167e62c3 [Core] [Fix] add plugin mapping ignore case (#1979)
167e62c3 is described below

commit 167e62c3b38a4bb8b72351b81bf0e947894af4ab
Author: Jia Fan <10...@qq.com>
AuthorDate: Wed Jun 1 19:42:11 2022 +0800

    [Core] [Fix] add plugin mapping ignore case (#1979)
    
    * add plugin mapping ignore case
    * change getPluginMappingValue scope to default
---
 .../seatunnel/core/base/config/PluginFactory.java  | 26 +++----
 .../core/base/config/PluginFactoryTest.java        | 65 +++++++++++++++++
 .../src/test/resources/plugin-mapping.properties   | 85 ++++++++++++++++++++++
 3 files changed, 160 insertions(+), 16 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
index 82b51ef1..24d4f963 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
@@ -117,12 +118,12 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
                     List<URL> pluginList = new ArrayList<>();
                     List<? extends Config> configList = config.getConfigList(type.getType());
                     configList.forEach(pluginConfig -> {
-
-                        if (containPluginMappingValue(pluginMapping, type, pluginConfig.getString(PLUGIN_NAME_KEY))) {
+                        Optional<String> mappingValue = getPluginMappingValue(pluginMapping, type,
+                                pluginConfig.getString(PLUGIN_NAME_KEY));
+                        if (mappingValue.isPresent()) {
                             try {
                                 for (File plugin : plugins) {
-                                    if (plugin.getName().startsWith(getPluginMappingValue(pluginMapping, type,
-                                            pluginConfig.getString(PLUGIN_NAME_KEY)))) {
+                                    if (plugin.getName().startsWith(mappingValue.get())) {
                                         pluginList.add(plugin.toURI().toURL());
                                         break;
                                     }
@@ -155,19 +156,12 @@ public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
 
     }
 
-    private String getPluginMappingValue(Config pluginMapping, PluginType type, String pluginName) {
-        return pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).getString(pluginName);
-    }
+    Optional<String> getPluginMappingValue(Config pluginMapping, PluginType type, String pluginName) {
+
+        return pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).entrySet()
+                .stream().filter(entry -> entry.getKey().equalsIgnoreCase(pluginName))
+                .map(entry -> entry.getValue().unwrapped().toString()).findAny();
 
-    private boolean containPluginMappingValue(Config pluginMapping, PluginType type, String pluginName) {
-        if (pluginMapping.hasPath(this.engineType.getEngine())) {
-            Config engine = pluginMapping.getConfig(this.engineType.getEngine());
-            if (engine.hasPath(type.getType())) {
-                Config plugins = engine.getConfig(type.getType());
-                return plugins.hasPath(pluginName);
-            }
-        }
-        return false;
     }
 
     /**
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/config/PluginFactoryTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/config/PluginFactoryTest.java
new file mode 100644
index 00000000..4ba315e5
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/config/PluginFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.base.config;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.spark.SparkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Objects;
+import java.util.Optional;
+
+public class PluginFactoryTest {
+
+    @Test
+    public void getPluginMappingValueTest() throws Exception {
+
+        Common.setDeployMode("cluster");
+        Config config = new ConfigBuilder<>(Paths.get(Objects.requireNonNull(ClassLoader.getSystemResource("flink.batch" +
+                ".conf")).toURI()), EngineType.SPARK).getConfig();
+
+        Config pluginMapping = ConfigFactory
+                .parseFile(new File(Objects.requireNonNull(ClassLoader.getSystemResource("plugin-mapping.properties")).toURI()))
+                .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                .resolveWith(ConfigFactory.systemProperties(),
+                        ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+        PluginFactory<SparkEnvironment> factory = new PluginFactory<>(config, EngineType.SPARK);
+
+        Object jarPrefix = factory.getPluginMappingValue(pluginMapping, PluginType.SOURCE, "fake");
+        Assert.assertEquals(jarPrefix, Optional.of("seatunnel-connector-spark-fake"));
+
+        Object jarPrefix2 = factory.getPluginMappingValue(pluginMapping, PluginType.SINK, "console");
+        Assert.assertEquals(jarPrefix2, Optional.of("seatunnel-connector-spark-console"));
+
+        Object jarPrefix3 = factory.getPluginMappingValue(pluginMapping, PluginType.SOURCE, "FaKE");
+        Assert.assertEquals(jarPrefix3, Optional.of("seatunnel-connector-spark-fake"));
+
+        Object jarPrefix4 = factory.getPluginMappingValue(pluginMapping, PluginType.SINK, "HbASe");
+        Assert.assertEquals(jarPrefix4, Optional.of("seatunnel-connector-spark-hbase"));
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/resources/plugin-mapping.properties b/seatunnel-core/seatunnel-core-base/src/test/resources/plugin-mapping.properties
new file mode 100644
index 00000000..619911fe
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/resources/plugin-mapping.properties
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This mapping is used to resolve the Jar package name without version (or call artifactId)
+# corresponding to the module in the user Config, helping SeaTunnel to load the correct Jar package.
+
+# Flink Source
+flink.source.DruidSource = seatunnel-connector-flink-druid
+flink.source.FakeSource = seatunnel-connector-flink-fake
+flink.source.FakeSourceStream = seatunnel-connector-flink-fake
+flink.source.FileSource = seatunnel-connector-flink-file
+flink.source.InfluxDbSource = seatunnel-connector-flink-influxdb
+flink.source.JdbcSource = seatunnel-connector-flink-jdbc
+flink.source.KafkaTableStream = seatunnel-connector-flink-kafka
+flink.source.SocketStream = seatunnel-connector-flink-socket
+flink.source.Http = seatunnel-connector-flink-http
+
+# Flink Sink
+
+flink.sink.Clickhouse = seatunnel-connector-flink-clickhouse
+flink.sink.ClickhouseFile = seatunnel-connector-flink-clickhouse
+flink.sink.ConsoleSink = seatunnel-connector-flink-console
+flink.sink.DorisSink = seatunnel-connector-flink-doris
+flink.sink.DruidSink = seatunnel-connector-flink-druid
+flink.sink.ElasticSearch = seatunnel-connector-flink-elasticsearch7
+flink.sink.FileSink = seatunnel-connector-flink-file
+flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb
+flink.sink.JdbcSink = seatunnel-connector-flink-jdbc
+flink.sink.Kafka = seatunnel-connector-flink-kafka
+
+# Spark Source
+
+spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch
+spark.source.Fake = seatunnel-connector-spark-fake
+spark.source.FeishuSheet = seatunnel-connector-spark-feishu
+spark.source.File = seatunnel-connector-spark-file
+spark.source.Hbase = seatunnel-connector-spark-hbase
+spark.source.Hive = seatunnel-connector-spark-hive
+spark.source.Http = seatunnel-connector-spark-http
+spark.source.Hudi = seatunnel-connector-spark-hudi
+spark.source.Iceberg = seatunnel-connector-spark-iceberg
+spark.source.Jdbc = seatunnel-connector-spark-jdbc
+spark.source.KafkaStream = seatunnel-connector-spark-kafka
+spark.source.Kudu = seatunnel-connector-spark-kudu
+spark.source.MongoDB = seatunnel-connector-spark-mongodb
+spark.source.Neo4j = seatunnel-connector-spark-neo4j
+spark.source.Phoenix = seatunnel-connector-spark-phoenix
+spark.source.Redis = seatunnel-connector-spark-redis
+spark.source.SocketStream = seatunnel-connector-spark-socket
+spark.source.TiDB = seatunnel-connector-spark-tidb
+
+# Spark Sink
+
+spark.sink.Clickhouse = seatunnel-connector-spark-clickhouse
+spark.sink.ClickhouseFile = seatunnel-connector-spark-clickhouse
+spark.sink.Console = seatunnel-connector-spark-console
+spark.sink.Doris = seatunnel-connector-spark-doris
+spark.sink.ElasticSearch = seatunnel-connector-spark-elasticsearch
+spark.sink.Email = seatunnel-connector-spark-email
+spark.sink.File = seatunnel-connector-spark-file
+spark.sink.Hbase = seatunnel-connector-spark-hbase
+spark.sink.Hive = seatunnel-connector-spark-hive
+spark.sink.Hudi = seatunnel-connector-spark-hudi
+spark.sink.Iceberg = seatunnel-connector-spark-iceberg
+spark.sink.Jdbc = seatunnel-connector-spark-jdbc
+spark.sink.Kafka = seatunnel-connector-spark-kafka
+spark.sink.Kudu = seatunnel-connector-spark-kudu
+spark.sink.MongoDB = seatunnel-connector-spark-mongodb
+spark.sink.Phoenix = seatunnel-connector-spark-phoenix
+spark.sink.Redis = seatunnel-connector-spark-redis
+spark.sink.TiDB = seatunnel-connector-spark-tidb