You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/09/17 05:10:27 UTC
[incubator-seatunnel] branch dev updated: [Improve][e2e] Add driver-jar to lib (#2719)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d64d452c8 [Improve][e2e] Add driver-jar to lib (#2719)
d64d452c8 is described below
commit d64d452c86534547bbd17ef8659058a58db670b0
Author: liugddx <80...@qq.com>
AuthorDate: Sat Sep 17 13:10:21 2022 +0800
[Improve][e2e] Add driver-jar to lib (#2719)
* [Improve][e2e] add driver-jar to lib
* discovery third-party jars
* Remove excess code
* modify e2e test case
* modify e2e test case
* fix e2e error,if i run docker in another machine, the case will error
* fix some bug
* fix some bug
* fix some bug
* use root to start spark container.
* move `addURLToClassLoader` into `registerPlugin`
* move `ADD_URL_TO_CLASSLOADER` into `FlinkCommon`
---
.../org/apache/seatunnel/common/config/Common.java | 2 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 3 +-
.../core/starter/flink/config/FlinkCommon.java | 42 +++++++++++++
.../execution/AbstractPluginExecuteProcessor.java | 14 +----
.../starter/flink/execution/FlinkExecution.java | 8 ++-
.../seatunnel/e2e/common/AbstractContainer.java | 6 ++
.../e2e/common/AbstractSparkContainer.java | 5 +-
.../connector-jdbc-flink-e2e/pom.xml | 3 +-
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 18 ++++--
.../seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java | 9 ++-
.../e2e/flink/v2/jdbc/JdbcGreenplumIT.java | 40 +++++++-----
.../seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java | 71 +++++++++++----------
.../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java | 18 ++++--
.../seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java | 8 +--
.../e2e/spark/v2/jdbc/JdbcGreenplumIT.java | 41 +++++++-----
.../seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java | 73 +++++++++++-----------
.../plugin/discovery/AbstractPluginDiscovery.java | 14 ++---
17 files changed, 223 insertions(+), 152 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 294cc4965..28b64a728 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -145,7 +145,7 @@ public class Common {
/**
* return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'.
*/
- public static List<Path> getPluginsJarDependencies(){
+ public static List<Path> getPluginsJarDependencies() {
Path pluginRootDir = Common.pluginRootDir();
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
return Collections.emptyList();
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 68b6ad284..3aa3e9404 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -28,7 +28,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>connector-jdbc</artifactId>
-
+
<properties>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<mysql.version>8.0.16</mysql.version>
@@ -49,6 +49,7 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java
new file mode 100644
index 000000000..7d6dcac55
--- /dev/null
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.config;
+
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.function.BiConsumer;
+
+public class FlinkCommon {
+
+ /**
+ * Add jar url to classloader. The different engine should have different logic to add url into
+ * their own classloader
+ */
+ public static BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> {
+ if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+ URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
+ ReflectionUtils.invoke(c, "addURL", url);
+ } else if (classLoader instanceof URLClassLoader) {
+ ReflectionUtils.invoke(classLoader, "addURL", url);
+ } else {
+ throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
+ }
+ };
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 1b680b9e8..04a512b99 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -21,7 +21,7 @@ import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.net.URL;
-import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
@@ -47,16 +46,7 @@ public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecute
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME = "plugin_name";
- protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = (classLoader, url) -> {
- if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
- URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
- ReflectionUtils.invoke(c, "addURL", url);
- } else if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
- } else {
- throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
- }
- };
+ protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = FlinkCommon.ADD_URL_TO_CLASSLOADER;
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
JobContext jobContext,
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a4c52638a..51b36e7d2 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
+import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
import org.apache.seatunnel.core.starter.flink.config.FlinkEnvironmentFactory;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -54,10 +55,10 @@ public class FlinkExecution implements TaskExecution {
this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
JobContext jobContext = new JobContext();
jobContext.setJobMode(flinkEnvironment.getJobMode());
+ registerPlugin();
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SOURCE));
this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.TRANSFORM));
this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SINK));
- registerPlugin();
}
@Override
@@ -75,7 +76,7 @@ public class FlinkExecution implements TaskExecution {
}
}
- private void registerPlugin(){
+ private void registerPlugin() {
List<URL> pluginsJarDependencies = Common.getPluginsJarDependencies().stream()
.map(Path::toUri)
.map(uri -> {
@@ -86,6 +87,9 @@ public class FlinkExecution implements TaskExecution {
}
})
.collect(Collectors.toList());
+
+ pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
+
flinkEnvironment.registerPlugin(pluginsJarDependencies);
}
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
index 71e93ab92..c1550d993 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
@@ -64,6 +64,10 @@ public abstract class AbstractContainer {
protected abstract List<String> getExtraStartShellCommands();
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ //do nothing
+ }
+
protected void copySeaTunnelStarter(GenericContainer<?> container) {
ContainerUtil.copySeaTunnelStarter(container,
this.startModuleName,
@@ -81,6 +85,8 @@ public abstract class AbstractContainer {
getConnectorNamePrefix(),
getConnectorType(),
getSeaTunnelHomeInContainer());
+ // execute extra commands
+ executeExtraCommands(container);
return executeCommand(container, confInContainerPath);
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
index 6cb49b5a0..3c10c8181 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
@@ -61,7 +61,8 @@ public abstract class AbstractSparkContainer extends AbstractContainer {
.withNetworkAliases("spark-master")
.withExposedPorts()
.withEnv("SPARK_MODE", "master")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
// In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
// start a worker.
Startables.deepStart(Stream.of(master)).join();
@@ -79,7 +80,7 @@ public abstract class AbstractSparkContainer extends AbstractContainer {
@Override
protected List<String> getExtraStartShellCommands() {
return Arrays.asList("--master local",
- "--deploy-mode client");
+ "--deploy-mode client");
}
public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index eb9ed148f..1d46370b6 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -72,6 +72,7 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
@@ -80,4 +81,4 @@
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index f83781571..9164a7f5c 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@@ -47,14 +48,15 @@ import java.util.stream.Stream;
public class FakeSourceToJdbcIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
private PostgreSQLContainer<?> psl;
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
- .withNetwork(NETWORK)
- .withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
@@ -70,8 +72,8 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL\n" +
- ")";
+ " name varchar(255) NOT NULL\n" +
+ ")";
statement.execute(sql);
} catch (SQLException e) {
throw new RuntimeException("Initializing PostgreSql table failed!", e);
@@ -101,4 +103,10 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
psl.stop();
}
}
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
index 861200526..67969ae81 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
@@ -51,8 +51,7 @@ public class JdbcDmdbIT extends FlinkContainer {
private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
private static final String HOST = "flink_e2e_dmdb";
- private static final String LOCAL_HOST = "localhost";
- private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+ private static final String URL = "jdbc:dm://%s:5236";
private static final String USERNAME = "SYSDBA";
private static final String PASSWORD = "SYSDBA";
private static final String DATABASE = "SYSDBA";
@@ -81,7 +80,8 @@ public class JdbcDmdbIT extends FlinkContainer {
}
private void initializeJdbcConnection() throws SQLException {
- jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+ jdbcConnection = DriverManager.getConnection(String.format(
+ URL, dbServer.getHost()), USERNAME, PASSWORD);
}
/**
@@ -110,8 +110,7 @@ public class JdbcDmdbIT extends FlinkContainer {
}
private void assertHasData(String table) {
- try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
- Statement statement = connection.createStatement();
+ try (Statement statement = jdbcConnection.createStatement();) {
String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
ResultSet source = statement.executeQuery(sql);
Assertions.assertTrue(source.next());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
index ac4366501..366106ae8 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
@@ -51,14 +51,13 @@ public class JdbcGreenplumIT extends FlinkContainer {
private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum";
private static final int GREENPLUM_CONTAINER_PORT = 5432;
- private static final String GREENPLUM_HOST = "localhost";
private static final int GREENPLUM_PORT = 5435;
private static final String GREENPLUM_USER = "tester";
private static final String GREENPLUM_PASSWORD = "pivotal";
private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
- private static final String GREENPLUM_JDBC_URL = String.format(
- "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+ private static final String GREENPLUM_JDBC_URL = "jdbc:postgresql://%s:%s/testdb";
private static final List<List> TEST_DATASET = generateTestDataset();
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
private GenericContainer<?> greenplumServer;
private Connection jdbcConnection;
@@ -66,11 +65,11 @@ public class JdbcGreenplumIT extends FlinkContainer {
@BeforeEach
public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ .withNetwork(NETWORK)
+ .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
greenplumServer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
+ String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
Startables.deepStart(Stream.of(greenplumServer)).join();
log.info("Greenplum container started");
// wait for Greenplum fully start
@@ -97,28 +96,29 @@ public class JdbcGreenplumIT extends FlinkContainer {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
- resultSet.getInt(1),
- resultSet.getString(2)));
+ resultSet.getInt(1),
+ resultSet.getString(2)));
}
}
Assertions.assertIterableEquals(TEST_DATASET, result);
}
private void initializeJdbcConnection() throws SQLException {
- jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
- GREENPLUM_USER, GREENPLUM_PASSWORD);
+ jdbcConnection = DriverManager.getConnection(String.format(
+ GREENPLUM_JDBC_URL, greenplumServer.getHost(), GREENPLUM_PORT),
+ GREENPLUM_USER, GREENPLUM_PASSWORD);
}
private void initializeJdbcTable() throws SQLException {
try (Statement statement = jdbcConnection.createStatement()) {
String createSource = "CREATE TABLE source (\n" +
- "age INT NOT NULL,\n" +
- "name VARCHAR(255) NOT NULL\n" +
- ")";
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
String createSink = "CREATE TABLE sink (\n" +
- "age INT NOT NULL,\n" +
- "name VARCHAR(255) NOT NULL\n" +
- ")";
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
statement.execute(createSource);
statement.execute(createSink);
}
@@ -158,4 +158,10 @@ public class JdbcGreenplumIT extends FlinkContainer {
jdbcConnection.close();
}
}
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
index e71e1a505..1fdc96a86 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
@@ -48,12 +48,11 @@ public class JdbcPhoenixIT extends FlinkContainer {
private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
private static final String PHOENIX_CONTAINER_HOST = "flink_e2e_phoenix_sink";
- private static final String PHOENIX_HOST = "localhost";
private static final int PHOENIX_PORT = 8764;
private static final int PHOENIX_CONTAINER_PORT = 8765;
- private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+ private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF";
private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
private GenericContainer<?> phoenixServer;
@@ -63,11 +62,11 @@ public class JdbcPhoenixIT extends FlinkContainer {
@BeforeEach
public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(PHOENIX_CONTAINER_HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
phoenixServer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+ String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
Startables.deepStart(Stream.of(phoenixServer)).join();
initializeJdbcConnection();
log.info("phoenix container started");
@@ -87,13 +86,13 @@ public class JdbcPhoenixIT extends FlinkContainer {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
- resultSet.getString(1),
- resultSet.getBoolean(2),
- resultSet.getDouble(3),
- resultSet.getFloat(4),
- resultSet.getShort(5),
- resultSet.getInt(6),
- resultSet.getInt(7)));
+ resultSet.getString(1),
+ resultSet.getBoolean(2),
+ resultSet.getDouble(3),
+ resultSet.getFloat(4),
+ resultSet.getShort(5),
+ resultSet.getInt(6),
+ resultSet.getInt(7)));
}
}
Assertions.assertIterableEquals(generateTestDataset(), result);
@@ -101,30 +100,30 @@ public class JdbcPhoenixIT extends FlinkContainer {
private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
Class.forName(PHOENIX_JDBC_DRIVER);
- connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+ connection = DriverManager.getConnection(String.format(PHOENIX_CONNECT_URL, phoenixServer.getHost(), PHOENIX_PORT));
}
private void initializePhoenixTable() {
try {
Statement statement = connection.createStatement();
String createSource = "CREATE TABLE test.source (\n" +
- "\tf1 VARCHAR PRIMARY KEY,\n" +
- "\tf2 BOOLEAN,\n" +
- "\tf3 UNSIGNED_DOUBLE,\n" +
- "\tf4 UNSIGNED_FLOAT,\n" +
- "\tf5 UNSIGNED_SMALLINT,\n" +
- "\tf6 INTEGER,\n" +
- "\tf7 UNSIGNED_INT\n" +
- ")";
+ "\tf1 VARCHAR PRIMARY KEY,\n" +
+ "\tf2 BOOLEAN,\n" +
+ "\tf3 UNSIGNED_DOUBLE,\n" +
+ "\tf4 UNSIGNED_FLOAT,\n" +
+ "\tf5 UNSIGNED_SMALLINT,\n" +
+ "\tf6 INTEGER,\n" +
+ "\tf7 UNSIGNED_INT\n" +
+ ")";
String createSink = "CREATE TABLE test.sink (\n" +
- "\tf1 VARCHAR PRIMARY KEY,\n" +
- "\tf2 BOOLEAN,\n" +
- "\tf3 UNSIGNED_DOUBLE,\n" +
- "\tf4 UNSIGNED_FLOAT,\n" +
- "\tf5 UNSIGNED_SMALLINT,\n" +
- "\tf6 INTEGER,\n" +
- "\tf7 UNSIGNED_INT\n" +
- ")";
+ "\tf1 VARCHAR PRIMARY KEY,\n" +
+ "\tf2 BOOLEAN,\n" +
+ "\tf3 UNSIGNED_DOUBLE,\n" +
+ "\tf4 UNSIGNED_FLOAT,\n" +
+ "\tf5 UNSIGNED_SMALLINT,\n" +
+ "\tf6 INTEGER,\n" +
+ "\tf7 UNSIGNED_INT\n" +
+ ")";
statement.execute(createSource);
statement.execute(createSink);
} catch (SQLException e) {
@@ -143,12 +142,12 @@ public class JdbcPhoenixIT extends FlinkContainer {
List<List> rows = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
rows.add(Arrays.asList(String.format("test_%s", i),
- i % 2 == 0,
- Double.valueOf(i + 1),
- Float.valueOf(i + 2),
- (short) (i + 3),
- Integer.valueOf(i + 4),
- i + 5
+ i % 2 == 0,
+ Double.valueOf(i + 1),
+ Float.valueOf(i + 2),
+ (short) (i + 3),
+ Integer.valueOf(i + 4),
+ i + 5
));
}
return rows;
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index bee525373..eac403996 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@@ -45,14 +46,15 @@ import java.util.stream.Stream;
public class JdbcSourceToConsoleIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
private PostgreSQLContainer<?> psl;
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
- .withNetwork(NETWORK)
- .withNetworkAliases("postgresql")
- .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Class.forName(psl.getDriverClassName());
@@ -69,8 +71,8 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
Statement statement = connection.createStatement();
String sql = "CREATE TABLE test (\n" +
- " name varchar(255) NOT NULL\n" +
- ")";
+ " name varchar(255) NOT NULL\n" +
+ ")";
statement.execute(sql);
} catch (SQLException e) {
throw new RuntimeException("Initializing PostgreSql table failed!", e);
@@ -106,4 +108,10 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
psl.stop();
}
}
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
index 9845c3c20..ebe7847c1 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
@@ -53,7 +53,7 @@ public class JdbcDmdbIT extends SparkContainer {
private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
private static final String HOST = "spark_e2e_dmdb";
private static final String LOCAL_HOST = "localhost";
- private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+ private static final String URL = "jdbc:dm://%s:5236";
private static final String USERNAME = "SYSDBA";
private static final String PASSWORD = "SYSDBA";
private static final String DATABASE = "SYSDBA";
@@ -95,7 +95,8 @@ public class JdbcDmdbIT extends SparkContainer {
}
private void initializeJdbcConnection() throws SQLException {
- jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+ jdbcConnection = DriverManager.getConnection(String.format(
+ URL, dbServer.getHost()), USERNAME, PASSWORD);
}
/**
@@ -124,8 +125,7 @@ public class JdbcDmdbIT extends SparkContainer {
}
private void assertHasData(String table) {
- try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
- Statement statement = connection.createStatement();
+ try (Statement statement = jdbcConnection.createStatement();) {
String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
ResultSet source = statement.executeQuery(sql);
Assertions.assertTrue(source.next());
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
index b11910eee..61aa4b12a 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
@@ -51,14 +51,14 @@ public class JdbcGreenplumIT extends SparkContainer {
private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
private static final String GREENPLUM_CONTAINER_HOST = "spark_e2e_greenplum";
private static final int GREENPLUM_CONTAINER_PORT = 5432;
- private static final String GREENPLUM_HOST = "localhost";
private static final int GREENPLUM_PORT = 5436;
private static final String GREENPLUM_USER = "tester";
private static final String GREENPLUM_PASSWORD = "pivotal";
private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
- private static final String GREENPLUM_JDBC_URL = String.format(
- "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+ private static final String GREENPLUM_JDBC_URL = "jdbc:postgresql://%s:%s/testdb";
+
private static final List<List> TEST_DATASET = generateTestDataset();
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
private GenericContainer<?> greenplumServer;
private Connection jdbcConnection;
@@ -66,11 +66,11 @@ public class JdbcGreenplumIT extends SparkContainer {
@BeforeEach
public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ .withNetwork(NETWORK)
+ .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
greenplumServer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
+ String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
Startables.deepStart(Stream.of(greenplumServer)).join();
log.info("Greenplum container started");
// wait for Greenplum fully start
@@ -97,28 +97,29 @@ public class JdbcGreenplumIT extends SparkContainer {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
- resultSet.getInt(1),
- resultSet.getString(2)));
+ resultSet.getInt(1),
+ resultSet.getString(2)));
}
}
Assertions.assertIterableEquals(TEST_DATASET, result);
}
private void initializeJdbcConnection() throws SQLException {
- jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
- GREENPLUM_USER, GREENPLUM_PASSWORD);
+ jdbcConnection = DriverManager.getConnection(String.format(
+ GREENPLUM_JDBC_URL, greenplumServer.getHost(), GREENPLUM_PORT),
+ GREENPLUM_USER, GREENPLUM_PASSWORD);
}
private void initializeJdbcTable() throws SQLException {
try (Statement statement = jdbcConnection.createStatement()) {
String createSource = "CREATE TABLE source (\n" +
- "age INT NOT NULL,\n" +
- "name VARCHAR(255) NOT NULL\n" +
- ")";
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
String createSink = "CREATE TABLE sink (\n" +
- "age INT NOT NULL,\n" +
- "name VARCHAR(255) NOT NULL\n" +
- ")";
+ "age INT NOT NULL,\n" +
+ "name VARCHAR(255) NOT NULL\n" +
+ ")";
statement.execute(createSource);
statement.execute(createSink);
}
@@ -158,4 +159,10 @@ public class JdbcGreenplumIT extends SparkContainer {
jdbcConnection.close();
}
}
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
index 66814cc77..34e6e20fc 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
@@ -48,12 +48,11 @@ public class JdbcPhoenixIT extends SparkContainer {
private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
private static final String PHOENIX_CONTAINER_HOST = "spark_e2e_phoenix_sink";
- private static final String PHOENIX_HOST = "localhost";
private static final int PHOENIX_PORT = 8763;
private static final int PHOENIX_CONTAINER_PORT = 8765;
- private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+ private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF";
private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
private GenericContainer<?> phoenixServer;
@@ -63,11 +62,11 @@ public class JdbcPhoenixIT extends SparkContainer {
@BeforeEach
public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(PHOENIX_CONTAINER_HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
phoenixServer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+ String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
Startables.deepStart(Stream.of(phoenixServer)).join();
initializeJdbcConnection();
log.info("phoenix container started");
@@ -87,13 +86,13 @@ public class JdbcPhoenixIT extends SparkContainer {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(Arrays.asList(
- resultSet.getString(1),
- resultSet.getBoolean(2),
- resultSet.getDouble(3),
- resultSet.getFloat(4),
- resultSet.getShort(5),
- resultSet.getInt(6),
- resultSet.getInt(7)));
+ resultSet.getString(1),
+ resultSet.getBoolean(2),
+ resultSet.getDouble(3),
+ resultSet.getFloat(4),
+ resultSet.getShort(5),
+ resultSet.getInt(6),
+ resultSet.getInt(7)));
}
}
Assertions.assertIterableEquals(generateTestDataset(), result);
@@ -101,30 +100,30 @@ public class JdbcPhoenixIT extends SparkContainer {
private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
Class.forName(PHOENIX_JDBC_DRIVER);
- connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+ connection = DriverManager.getConnection(String.format(PHOENIX_CONNECT_URL, phoenixServer.getHost(), PHOENIX_PORT));
}
private void initializePhoenixTable() {
- try {
+ try {
Statement statement = connection.createStatement();
String createSource = "CREATE TABLE test.source (\n" +
- "\tf1 VARCHAR PRIMARY KEY,\n" +
- "\tf2 BOOLEAN,\n" +
- "\tf3 UNSIGNED_DOUBLE,\n" +
- "\tf4 UNSIGNED_FLOAT,\n" +
- "\tf5 UNSIGNED_SMALLINT,\n" +
- "\tf6 INTEGER,\n" +
- "\tf7 UNSIGNED_INT\n" +
- ")";
+ "\tf1 VARCHAR PRIMARY KEY,\n" +
+ "\tf2 BOOLEAN,\n" +
+ "\tf3 UNSIGNED_DOUBLE,\n" +
+ "\tf4 UNSIGNED_FLOAT,\n" +
+ "\tf5 UNSIGNED_SMALLINT,\n" +
+ "\tf6 INTEGER,\n" +
+ "\tf7 UNSIGNED_INT\n" +
+ ")";
String createSink = "CREATE TABLE test.sink (\n" +
- "\tf1 VARCHAR PRIMARY KEY,\n" +
- "\tf2 BOOLEAN,\n" +
- "\tf3 UNSIGNED_DOUBLE,\n" +
- "\tf4 UNSIGNED_FLOAT,\n" +
- "\tf5 UNSIGNED_SMALLINT,\n" +
- "\tf6 INTEGER,\n" +
- "\tf7 UNSIGNED_INT\n" +
- ")";
+ "\tf1 VARCHAR PRIMARY KEY,\n" +
+ "\tf2 BOOLEAN,\n" +
+ "\tf3 UNSIGNED_DOUBLE,\n" +
+ "\tf4 UNSIGNED_FLOAT,\n" +
+ "\tf5 UNSIGNED_SMALLINT,\n" +
+ "\tf6 INTEGER,\n" +
+ "\tf7 UNSIGNED_INT\n" +
+ ")";
statement.execute(createSource);
statement.execute(createSink);
} catch (SQLException e) {
@@ -143,12 +142,12 @@ public class JdbcPhoenixIT extends SparkContainer {
List<List> rows = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
rows.add(Arrays.asList(String.format("test_%s", i),
- i % 2 == 0,
- Double.valueOf(i + 1),
- Float.valueOf(i + 2),
- (short) (i + 3),
- Integer.valueOf(i + 4),
- i + 5
+ i % 2 == 0,
+ Double.valueOf(i + 1),
+ Float.valueOf(i + 2),
+ (short) (i + 3),
+ Integer.valueOf(i + 4),
+ i + 5
));
}
return rows;
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 529049455..8fffecddd 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -64,7 +64,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
};
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
- new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -80,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
@Override
public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
- .map(this::getPluginJarPath)
- .filter(Optional::isPresent)
- .map(Optional::get).distinct()
- .collect(Collectors.toList());
+ .map(this::getPluginJarPath)
+ .filter(Optional::isPresent)
+ .map(Optional::get).distinct()
+ .collect(Collectors.toList());
}
@Override
@@ -109,13 +109,13 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
} catch (Exception e) {
LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
- " message: " + e.getMessage());
+ " message: " + e.getMessage());
classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
}
pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
- pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
+ pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
return pluginInstance;
}
}