You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/27 10:49:22 UTC
[incubator-seatunnel] branch st-engine updated: [st-engine] Add engine client (#2223)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 1400f6b44 [st-engine] Add engine client (#2223)
1400f6b44 is described below
commit 1400f6b443cf6c1481a3187bd829d27179f6f73d
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Jul 27 18:49:16 2022 +0800
[st-engine] Add engine client (#2223)
* tmp
* tmp
* tmp1
* fix checkstyle error.
* fix license header error
* fix error
* fix error
* add source license
* fix build error
* add seatunnel-engine-core module
* remove jet code
* fix checkstyle
* fix checkstyle
* fix checkstyle
* add seatunnel-engine-core to seatunnel-dist
* merge 8250fc8b7b87b788d0d3869c009a9baa91a2f1d0
* fix conflict
* fix conflict
* fix error
* fix ci error
* retry ci
* resolve review
* resolve review
* fix review
Co-authored-by: Kirs <ki...@apache.org>
---
.github/workflows/backend.yml | 1 -
LICENSE | 4 +-
generate_client_protocol.sh | 58 +++++++++++++
pom.xml | 12 +++
.../seatunnel/api/transform/Transformation.java | 12 +--
.../src/main/assembly/assembly-bin-ci.xml | 21 +++++
seatunnel-dist/src/main/assembly/assembly-bin.xml | 15 +++-
seatunnel-engine/pom.xml | 28 ++-----
seatunnel-engine/seatunnel-engine-client/pom.xml | 67 +++++++++++++++
.../engine/client/JobExecutionEnvironment.java | 65 +++++++++++++++
.../seatunnel/engine/client/SeaTunnelClient.java | 92 +++++++++++++++++++++
.../engine/client/SeaTunnelClientConfig.java} | 18 ++--
.../engine/client/SeaTunnelClientInstance.java} | 20 +++--
.../engine/client/SeaTunnelClientTest.java | 55 +++++++++++++
seatunnel-engine/seatunnel-engine-common/pom.xml | 46 +++++++++++
.../apache/seatunnel/engine/common/Constant.java} | 13 +--
.../seatunnel/engine/common/config/JobConfig.java} | 12 +--
.../exception/JobNotFoundExceptionSeaTunnel.java} | 17 ++--
.../exception/SeaTunnelEngineException.java} | 27 ++++--
.../engine/common/utils/ExceptionUtil.java | 96 ++++++++++++++++++++++
seatunnel-engine/seatunnel-engine-core/pom.xml | 63 ++++++++++++++
.../protocol/codec/SeaTunnelPrintMessageCodec.java | 91 ++++++++++++++++++++
.../SeaTunnelEngine.yaml} | 30 ++++++-
seatunnel-engine/seatunnel-engine-server/pom.xml | 52 ++++++++++++
.../seatunnel/engine/server/NodeExtension.java | 2 +-
.../engine/server/NodeExtensionCommon.java | 8 +-
.../engine/server/SeaTunnelNodeContext.java} | 2 +-
.../seatunnel/engine/server/SeaTunnelServer.java} | 4 +-
.../engine/server/SeaTunnelServerStarter.java} | 6 +-
.../server/operation/PrintMessageOperation.java | 75 +++++++++++++++++
.../server/protocol/task/PrintMessageTask.java | 93 +++++++++++++++++++++
.../task/SeaTunnelMessageTaskFactoryProvider.java | 46 +++++++++++
.../serializable/OperationDataSerializerHook.java | 57 +++++++++++++
.../serializable/OperationFactoryIdConstant.java} | 20 ++---
.../services/com.hazelcast.DataSerializerHook} | 7 +-
...lient.impl.protocol.MessageTaskFactoryProvider} | 7 +-
tools/checkstyle/checkStyle.xml | 4 +
tools/checkstyle/suppressions.xml | 27 ++++++
38 files changed, 1165 insertions(+), 108 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index ee6620ec7..867afa9c0 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -16,7 +16,6 @@
#
name: Backend
-
on:
push:
pull_request:
diff --git a/LICENSE b/LICENSE
index ca1720aba..09e15e69a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,4 +215,6 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java from https://github.com/lightbend/config
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java from https://github.com/lightbend/config
+generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
\ No newline at end of file
diff --git a/generate_client_protocol.sh b/generate_client_protocol.sh
new file mode 100755
index 000000000..2b26ecda3
--- /dev/null
+++ b/generate_client_protocol.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPT_DIR="$(dirname "$0")"
+SEATUNNEL_ENGINE_HOME="$(cd "$SCRIPT_DIR/"; pwd)"
+
+PYTHON="$(which python3 2>/dev/null)"
+PIP3="$(which pip3 2>/dev/null)"
+GIT="$(which git 2>/dev/null)"
+
+PROTOCOL_DIRECTORY=`mktemp -d 2>/dev/null || mktemp -d -t 'protocol'`
+
+if [ -z "$PYTHON" ]; then
+ echo "Python 3 could not be found in your system."
+ exit 1
+fi
+
+if [ -z "$PIP3" ]; then
+ echo "PIP 3 could not be found in your system."
+ exit 1
+fi
+
+if [ -z "$GIT" ]; then
+ echo "Git could not be found in your system."
+ exit 1
+fi
+
+echo $SCRIPT_DIR
+echo $SEATUNNEL_ENGINE_HOME
+echo $PROTOCOL_DIRECTORY
+
+$GIT clone --depth=1 https://github.com/hazelcast/hazelcast-client-protocol.git $PROTOCOL_DIRECTORY
+
+cd $PROTOCOL_DIRECTORY
+
+$PIP3 install -r requirements.txt
+
+$PYTHON generator.py -r $SEATUNNEL_ENGINE_HOME -p $SEATUNNEL_ENGINE_HOME/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition \
+-o seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
+-n org.apache.seatunnel.engine.core.protocol.codec --no-binary --no-id-check
+
+rm -rf $PROTOCOL_DIRECTORY
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4469c2d74..ab1634dec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,8 @@
<springfox-swagger.version>2.6.1</springfox-swagger.version>
<swagger-annotations.version>1.5.10</swagger-annotations.version>
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
+
+ <!-- SeaTunnel Engine use -->
<hazelcast.version>5.1</hazelcast.version>
</properties>
@@ -925,12 +927,22 @@
<version>${hibernate.validator.version}</version>
</dependency>
+ <!-- SeaTunnel engine use begin -->
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>${hazelcast.version}</version>
</dependency>
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <scope>test</scope>
+ <version>${hazelcast.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
+ <!-- SeaTunnel engine use end -->
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
index 5749f4c3a..c68be96df 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
@@ -15,15 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.api.transform;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
- }
+public interface Transformation {
}
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 1f15cbc08..d4f7aaffa 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -118,6 +118,27 @@
</excludes>
<outputDirectory>/lib</outputDirectory>
</fileSet>
+ <!-- seatunnel engine -->
+ <fileSet>
+ <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
+ <includes>
+ <include>seatunnel-engine-*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ </excludes>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+ <includes>
+ <include>seatunnel-engine-*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ </excludes>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
<!-- connectors -->
<fileSet>
<directory>../seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib</directory>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index bd2841102..eddaf8129 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -118,10 +118,21 @@
</excludes>
<outputDirectory>/lib</outputDirectory>
</fileSet>
+ <!-- seatunnel engine -->
<fileSet>
- <directory>../seatunnel-engine/target</directory>
+ <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
<includes>
- <include>seatunnel-engine*.jar</include>
+ <include>seatunnel-engine-*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ </excludes>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+ <includes>
+ <include>seatunnel-engine-*.jar</include>
</includes>
<excludes>
<exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-engine/pom.xml b/seatunnel-engine/pom.xml
index f3064079c..d5efd7b5a 100644
--- a/seatunnel-engine/pom.xml
+++ b/seatunnel-engine/pom.xml
@@ -17,30 +17,18 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
-
- <artifactId>seatunnel-engine</artifactId>
<packaging>pom</packaging>
+ <artifactId>seatunnel-engine</artifactId>
- <dependencies>
-
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
-
-
- </dependencies>
-
-
+ <modules>
+ <module>seatunnel-engine-client</module>
+ <module>seatunnel-engine-common</module>
+ <module>seatunnel-engine-server</module>
+ <module>seatunnel-engine-core</module>
+ </modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
new file mode 100644
index 000000000..26c502f36
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-engine</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-engine-client</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
new file mode 100644
index 000000000..9cc0bfb61
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.transform.Transformation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JobExecutionEnvironment {
+
+ private List<Transformation> transformations;
+
+ private static String DEFAULT_JOB_NAME = "test_st_job";
+
+ private SeaTunnelClientConfig configuration;
+
+ private String jobName;
+
+ private int maxParallelism = 1;
+
+ public JobExecutionEnvironment(SeaTunnelClientConfig configuration) {
+ this.configuration = configuration;
+ }
+
+ public void addTransformation(Transformation transformation) {
+ if (transformations == null) {
+ transformations = new ArrayList<>();
+ }
+ this.transformations.add(transformation);
+ }
+
+ public List<Transformation> getTransformations() {
+ return transformations;
+ }
+
+ public void setTransformations(List<Transformation> transformations) {
+ this.transformations = transformations;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public void setMaxParallelism(int maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ }
+}
+
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
new file mode 100644
index 000000000..652883a7d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.spi.impl.ClientInvocation;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.internal.serialization.SerializationService;
+import com.hazelcast.internal.util.Preconditions;
+import com.hazelcast.logging.ILogger;
+import lombok.NonNull;
+
+import java.util.UUID;
+import java.util.function.Function;
+
+public class SeaTunnelClient implements SeaTunnelClientInstance {
+ private final HazelcastClientInstanceImpl hazelcastClient;
+ private final SerializationService serializationService;
+
+ public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
+ Preconditions.checkNotNull(seaTunnelClientConfig, "config");
+ this.hazelcastClient = ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+ this.serializationService = hazelcastClient.getSerializationService();
+ ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
+ }
+
+ @NonNull
+ @Override
+ public HazelcastInstance getHazelcastInstance() {
+ return hazelcastClient;
+ }
+
+ @Override
+ public JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config) {
+ // TODO analyze job config file and create LocalExecutionContext
+ return null;
+ }
+
+ public ILogger getLogger() {
+ return hazelcastClient.getLoggingService().getLogger(getClass());
+ }
+
+ private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
+ Function<ClientMessage, Object> decoder) {
+ UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+ return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
+ }
+
+ private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
+ Function<ClientMessage, Object> decoder) {
+ return invokeRequestAndDecodeResponse(null, request, decoder);
+ }
+
+ private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
+ Function<ClientMessage, Object> decoder) {
+ ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+ try {
+ ClientMessage response = invocation.invoke().get();
+ return serializationService.toObject(decoder.apply(response));
+ } catch (Throwable t) {
+ throw ExceptionUtil.rethrow(t);
+ }
+ }
+
+ public String printMessageToMaster(@NonNull String msg) {
+ return invokeRequestOnMasterAndDecodeResponse(
+ SeaTunnelPrintMessageCodec.encodeRequest(msg),
+ response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
+ );
+ }
+}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
similarity index 64%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
index 5749f4c3a..aa15c138a 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.client;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.client.config.ClientConfig;
-public class ServerStarter {
+public class SeaTunnelClientConfig extends ClientConfig {
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+ /**
+ * Creates a new config instance with default group name for SeaTunnel Engine
+ */
+ public SeaTunnelClientConfig() {
+ // TODO we should get cluster name from server config instead of return a constant name.
+ super();
+ setClusterName("SeaTunnel Engine");
}
}
+
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
similarity index 62%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 5749f4c3a..34ca898d9 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.client;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.NonNull;
-public class ServerStarter {
+public interface SeaTunnelClientInstance {
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
- }
+ /**
+ * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
+ * be a client, depending on the type of this
+ */
+ @NonNull
+ HazelcastInstance getHazelcastInstance();
+
+ JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
new file mode 100644
index 000000000..0870fa08c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+@RunWith(JUnit4.class)
+public class SeaTunnelClientTest {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Config config = new Config();
+ config.getSecurityConfig().setEnabled(false);
+ config.getJetConfig().setEnabled(false);
+ config.getNetworkConfig().setPort(50001);
+ HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+ }
+
+ @Test
+ public void testSayHello() {
+ SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+ seaTunnelClientConfig.setClusterName("dev");
+ seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+
+ String msg = "Hello world";
+ String s = engineClient.printMessageToMaster(msg);
+ Assert.assertEquals(msg, s);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
new file mode 100644
index 000000000..c62993d55
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-engine</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-engine-common</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 5749f4c3a..cb0795ee4 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -15,15 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
- }
+public class Constant {
+ public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 5749f4c3a..d6cd39dc6 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -15,15 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.config;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
- }
+public class JobConfig {
}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
similarity index 64%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
index 5749f4c3a..1b4158886 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.exception;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+public class JobNotFoundExceptionSeaTunnel extends SeaTunnelEngineException {
+ public JobNotFoundExceptionSeaTunnel(long jobId) {
+ super("Job with id " + jobId + " not found");
+ }
-public class ServerStarter {
+ public JobNotFoundExceptionSeaTunnel(String message) {
+ super(message);
+ }
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+ public JobNotFoundExceptionSeaTunnel(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
similarity index 52%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
index 05822d331..b258b7db3 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
@@ -15,16 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.exception;
-import com.hazelcast.instance.impl.DefaultNodeContext;
-import com.hazelcast.instance.impl.Node;
-import com.hazelcast.instance.impl.NodeExtension;
+import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
+import com.hazelcast.core.HazelcastException;
-public class NodeContext extends DefaultNodeContext {
+public class SeaTunnelEngineException extends HazelcastException implements ClientExceptionFactory.ExceptionFactory {
+ public SeaTunnelEngineException() {
+ }
+
+ public SeaTunnelEngineException(String message) {
+ super(message);
+ }
+
+ public SeaTunnelEngineException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SeaTunnelEngineException(Throwable cause) {
+ super(cause);
+ }
@Override
- public NodeExtension createNodeExtension(Node node) {
- return new org.apache.seatunnel.engine.server.NodeExtension(node);
+ public Throwable createException(String s, Throwable throwable) {
+ return new SeaTunnelEngineException(s, throwable);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
new file mode 100644
index 000000000..af9b504b7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+
+import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
+import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
+import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
+import lombok.NonNull;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+public final class ExceptionUtil {
+
+ private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
+ new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new)
+ );
+
+ private ExceptionUtil() {
+ }
+
+ /**
+ * Called during startup to make our exceptions known to Hazelcast serialization
+ */
+ public static void registerSeaTunnelExceptions(@NonNull ClientExceptionFactory factory) {
+ for (ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory> exception : EXCEPTIONS) {
+ factory.register(exception.left, exception.middle, exception.right);
+ }
+ }
+
+ @NonNull
+ public static RuntimeException rethrow(@NonNull final Throwable t) {
+ if (t instanceof Error) {
+ if (t instanceof OutOfMemoryError) {
+ OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError) t);
+ }
+ throw (Error) t;
+ } else {
+ throw peeledAndUnchecked(t);
+ }
+ }
+
+ @NonNull
+ private static RuntimeException peeledAndUnchecked(@NonNull Throwable t) {
+ t = peel(t);
+
+ if (t instanceof RuntimeException) {
+ return (RuntimeException) t;
+ }
+
+ return new SeaTunnelEngineException(t);
+ }
+
+ /**
+ * If {@code t} is either of {@link CompletionException}, {@link ExecutionException}
+ * or {@link InvocationTargetException}, returns its cause, peeling it recursively.
+ * Otherwise returns {@code t}.
+ *
+ * @param t Throwable to peel
+ * @see #peeledAndUnchecked(Throwable)
+ */
+ public static Throwable peel(Throwable t) {
+ while ((t instanceof CompletionException
+ || t instanceof ExecutionException
+ || t instanceof InvocationTargetException)
+ && t.getCause() != null
+ && t.getCause() != t
+ ) {
+ t = t.getCause();
+ }
+ return t;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/pom.xml b/seatunnel-engine/seatunnel-engine-core/pom.xml
new file mode 100644
index 000000000..9a7e5cb51
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-engine</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-engine-core</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
new file mode 100644
index 000000000..75060fc90
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+/**
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
+public final class SeaTunnelPrintMessageCodec {
+ //hex: 0xDE0100
+ public static final int REQUEST_MESSAGE_TYPE = 14549248;
+ //hex: 0xDE0101
+ public static final int RESPONSE_MESSAGE_TYPE = 14549249;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelPrintMessageCodec() {
+ }
+
+ public static ClientMessage encodeRequest(java.lang.String message) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(false);
+ clientMessage.setOperationName("SeaTunnel.PrintMessage");
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ clientMessage.add(initialFrame);
+ StringCodec.encode(clientMessage, message);
+ return clientMessage;
+ }
+
+ /**
+ *
+ */
+ public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+ ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+ //empty initial frame
+ iterator.next();
+ return StringCodec.decode(iterator);
+ }
+
+ public static ClientMessage encodeResponse(java.lang.String response) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ StringCodec.encode(clientMessage, response);
+ return clientMessage;
+ }
+
+ /**
+ *
+ */
+ public static java.lang.String decodeResponse(ClientMessage clientMessage) {
+ ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+ //empty initial frame
+ iterator.next();
+ return StringCodec.decode(iterator);
+ }
+}
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
similarity index 55%
copy from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
copy to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 972b00a88..a16ea482f 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -5,11 +6,36 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+# The schema of this file can find from this link: https://github.com/hazelcast/hazelcast-client-protocol
+id: 222
+name: SeaTunnel
+methods:
+ - id: 1
+ name: printMessage
+ since: 2.0
+ doc: ''
+ request:
+ retryable: false
+ partitionIdentifier: -1
+ params:
+ - name: message
+ type: String
+ nullable: false
+ since: 2.0
+ doc: ''
+ response:
+ params:
+ - name: response
+ type: String
+ nullable: false
+ since: 2.0
+ doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
new file mode 100644
index 000000000..b193e438a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-engine</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-engine-server</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
similarity index 96%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index f583a91ef..1bd5a83d4 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -28,7 +28,7 @@ public class NodeExtension extends DefaultNodeExtension {
public NodeExtension(Node node) {
super(node);
- extCommon = new NodeExtensionCommon(node, new Server(node));
+ extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node));
}
@Override
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
index 8c6ab6dae..45ecaa3a6 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.engine.server;
import static com.hazelcast.cluster.ClusterState.PASSIVE;
+import org.apache.seatunnel.engine.common.Constant;
+
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.logging.ILogger;
@@ -41,9 +43,9 @@ class NodeExtensionCommon {
private final Node node;
private final ILogger logger;
- private final Server server;
+ private final SeaTunnelServer server;
- NodeExtensionCommon(Node node, Server server) {
+ NodeExtensionCommon(Node node, SeaTunnelServer server) {
this.node = node;
this.logger = node.getLogger(getClass().getName());
this.server = server;
@@ -94,7 +96,7 @@ class NodeExtensionCommon {
Map<String, Object> createExtensionServices() {
Map<String, Object> extensionServices = new HashMap<>();
- extensionServices.put(Server.SERVICE_NAME, server);
+ extensionServices.put(Constant.SEATUNNEL_SERVICE_NAME, server);
return extensionServices;
}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 05822d331..9c112591f 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -21,7 +21,7 @@ import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
-public class NodeContext extends DefaultNodeContext {
+public class SeaTunnelNodeContext extends DefaultNodeContext {
@Override
public NodeExtension createNodeExtension(Node node) {
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 90bbfc7c8..002b2558c 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -29,13 +29,13 @@ import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import java.util.Properties;
-public class Server implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
private NodeEngineImpl nodeEngine;
private final ILogger logger;
- public Server(Node node) {
+ public SeaTunnelServer(Node node) {
this.logger = node.getLogger(getClass());
logger.info("SeaTunnel server start...");
}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
similarity index 84%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 5749f4c3a..e40e5d727 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.engine.server;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-public class ServerStarter {
+public class SeaTunnelServerStarter {
public static void main(String[] args) {
Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+ config.getSecurityConfig().setEnabled(false);
+ config.getJetConfig().setEnabled(false);
+ HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java
new file mode 100644
index 000000000..df633ec9f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class PrintMessageOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+ private String message;
+
+ private String response;
+
+ public PrintMessageOperation() {
+ }
+
+ public PrintMessageOperation(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public final int getFactoryId() {
+ return OperationDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return OperationDataSerializerHook.PRINT_MESSAGE_OPERATOR;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeString(message);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ message = in.readString();
+ }
+
+ @Override
+ public void run() {
+ SeaTunnelServer service = getService();
+ response = service.printMessage(message);
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
new file mode 100644
index 000000000..78dee775e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+
+import com.google.common.base.Function;
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.exception.RetryableHazelcastException;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.security.Permission;
+
+public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+
+ protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
+ super(clientMessage, node, connection);
+ }
+
+ @Override
+ protected InvocationBuilder getInvocationBuilder(Operation op) {
+ Address masterAddress = nodeEngine.getMasterAddress();
+ if (masterAddress == null) {
+ throw new RetryableHazelcastException("master not yet known");
+ }
+ return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ op, masterAddress);
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new PrintMessageOperation(parameters);
+ }
+
+ @Override
+ protected String decodeClientMessage(ClientMessage clientMessage) {
+ Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
+ return decodeRequest.apply(clientMessage);
+ }
+
+ @Override
+ protected ClientMessage encodeResponse(Object response) {
+ Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
+ return encodeResponse.apply((String) response);
+ }
+
+ @Override
+ public String getServiceName() {
+ return Constant.SEATUNNEL_SERVICE_NAME;
+ }
+
+ @Override
+ public Permission getRequiredPermission() {
+ return null;
+ }
+
+ @Override
+ public String getDistributedObjectName() {
+ return null;
+ }
+
+ @Override
+ public String getMethodName() {
+ return "printMessage";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
new file mode 100644
index 000000000..9b25ecd32
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+
+import com.hazelcast.client.impl.protocol.MessageTaskFactory;
+import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.util.collection.Int2ObjectHashMap;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryProvider {
+ private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);
+ public final Node node;
+
+ public SeaTunnelMessageTaskFactoryProvider(NodeEngine nodeEngine) {
+ this.node = ((NodeEngineImpl) nodeEngine).getNode();
+ initFactories();
+ }
+
+ @Override
+ public Int2ObjectHashMap<MessageTaskFactory> getFactories() {
+ return this.factories;
+ }
+
+ private void initFactories() {
+ factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
new file mode 100644
index 000000000..807602605
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class OperationDataSerializerHook implements DataSerializerHook {
+ public static final int PRINT_MESSAGE_OPERATOR = 0;
+
+ public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+ OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
+ OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+ );
+
+ @Override
+ public int getFactoryId() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public DataSerializableFactory createFactory() {
+ return new Factory();
+ }
+
+ private static class Factory implements DataSerializableFactory {
+ @SuppressWarnings("checkstyle:returncount")
+ @Override
+ public IdentifiedDataSerializable create(int typeId) {
+ switch (typeId) {
+ case PRINT_MESSAGE_OPERATOR:
+ return new PrintMessageOperation();
+ default:
+ throw new IllegalArgumentException("Unknown type id " + typeId);
+ }
+ }
+ }
+}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
similarity index 56%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
index 5749f4c3a..e062bf67d 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.server.serializable;
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
- public static void main(String[] args) {
- Config config = new Config();
- HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
- }
+/**
+ * Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
+ * mechanism.
+ */
+public class OperationFactoryIdConstant {
+ /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
+ public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
+ /** Default ID of SeaTunnelEngine's data serialization factory. */
+ public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
}
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
similarity index 84%
copy from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
copy to seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index 972b00a88..ab0062e85 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -5,11 +6,13 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
similarity index 84%
rename from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
rename to seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
index 972b00a88..972834b5d 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -5,11 +6,13 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+org.apache.seatunnel.engine.server.protocol.task.SeaTunnelMessageTaskFactoryProvider
diff --git a/tools/checkstyle/checkStyle.xml b/tools/checkstyle/checkStyle.xml
index ddff7c20f..bf6e419a4 100755
--- a/tools/checkstyle/checkStyle.xml
+++ b/tools/checkstyle/checkStyle.xml
@@ -539,4 +539,8 @@
value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF, CTOR_DEF"/>
</module>
</module>
+
+ <module name="SuppressionFilter">
+ <property name="file" value="tools/checkstyle/suppressions.xml" />
+ </module>
</module>
diff --git a/tools/checkstyle/suppressions.xml b/tools/checkstyle/suppressions.xml
new file mode 100644
index 000000000..04495ba44
--- /dev/null
+++ b/tools/checkstyle/suppressions.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.1//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+ <suppress files="[\\/]org/apache/seatunnel/engine/core/protocol/codec" checks="[a-zA-Z0-9]*"/>
+ <suppress files="[\\/]org/apache/seatunnel/engine/core/protocol/compatibility" checks="[a-zA-Z0-9]*"/>
+</suppressions>
\ No newline at end of file