You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/27 10:49:22 UTC

[incubator-seatunnel] branch st-engine updated: [st-engine] Add engine client (#2223)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 1400f6b44 [st-engine] Add engine client (#2223)
1400f6b44 is described below

commit 1400f6b443cf6c1481a3187bd829d27179f6f73d
Author: Eric <ga...@gmail.com>
AuthorDate: Wed Jul 27 18:49:16 2022 +0800

    [st-engine] Add engine client (#2223)
    
    * tmp
    
    * tmp
    
    * tmp1
    
    * fix checkstyle error.
    
    * fix license header error
    
    * fix error
    
    * fix error
    
    * add source license
    
    * fix build error
    
    * add seatunnel-engine-core module
    
    * remove jet code
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix checkstyle
    
    * add seatunnel-engine-core to seatunnel-dist
    
    * merge 8250fc8b7b87b788d0d3869c009a9baa91a2f1d0
    
    * fix conflict
    
    * fix conflict
    
    * fix error
    
    * fix ci error
    
    * retry ci
    
    * resolve review
    
    * resolve review
    
    * fix review
    
    Co-authored-by: Kirs <ki...@apache.org>
---
 .github/workflows/backend.yml                      |  1 -
 LICENSE                                            |  4 +-
 generate_client_protocol.sh                        | 58 +++++++++++++
 pom.xml                                            | 12 +++
 .../seatunnel/api/transform/Transformation.java    | 12 +--
 .../src/main/assembly/assembly-bin-ci.xml          | 21 +++++
 seatunnel-dist/src/main/assembly/assembly-bin.xml  | 15 +++-
 seatunnel-engine/pom.xml                           | 28 ++-----
 seatunnel-engine/seatunnel-engine-client/pom.xml   | 67 +++++++++++++++
 .../engine/client/JobExecutionEnvironment.java     | 65 +++++++++++++++
 .../seatunnel/engine/client/SeaTunnelClient.java   | 92 +++++++++++++++++++++
 .../engine/client/SeaTunnelClientConfig.java}      | 18 ++--
 .../engine/client/SeaTunnelClientInstance.java}    | 20 +++--
 .../engine/client/SeaTunnelClientTest.java         | 55 +++++++++++++
 seatunnel-engine/seatunnel-engine-common/pom.xml   | 46 +++++++++++
 .../apache/seatunnel/engine/common/Constant.java}  | 13 +--
 .../seatunnel/engine/common/config/JobConfig.java} | 12 +--
 .../exception/JobNotFoundExceptionSeaTunnel.java}  | 17 ++--
 .../exception/SeaTunnelEngineException.java}       | 27 ++++--
 .../engine/common/utils/ExceptionUtil.java         | 96 ++++++++++++++++++++++
 seatunnel-engine/seatunnel-engine-core/pom.xml     | 63 ++++++++++++++
 .../protocol/codec/SeaTunnelPrintMessageCodec.java | 91 ++++++++++++++++++++
 .../SeaTunnelEngine.yaml}                          | 30 ++++++-
 seatunnel-engine/seatunnel-engine-server/pom.xml   | 52 ++++++++++++
 .../seatunnel/engine/server/NodeExtension.java     |  2 +-
 .../engine/server/NodeExtensionCommon.java         |  8 +-
 .../engine/server/SeaTunnelNodeContext.java}       |  2 +-
 .../seatunnel/engine/server/SeaTunnelServer.java}  |  4 +-
 .../engine/server/SeaTunnelServerStarter.java}     |  6 +-
 .../server/operation/PrintMessageOperation.java    | 75 +++++++++++++++++
 .../server/protocol/task/PrintMessageTask.java     | 93 +++++++++++++++++++++
 .../task/SeaTunnelMessageTaskFactoryProvider.java  | 46 +++++++++++
 .../serializable/OperationDataSerializerHook.java  | 57 +++++++++++++
 .../serializable/OperationFactoryIdConstant.java}  | 20 ++---
 .../services/com.hazelcast.DataSerializerHook}     |  7 +-
 ...lient.impl.protocol.MessageTaskFactoryProvider} |  7 +-
 tools/checkstyle/checkStyle.xml                    |  4 +
 tools/checkstyle/suppressions.xml                  | 27 ++++++
 38 files changed, 1165 insertions(+), 108 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index ee6620ec7..867afa9c0 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -16,7 +16,6 @@
 #
 
 name: Backend
-
 on:
   push:
   pull_request:
diff --git a/LICENSE b/LICENSE
index ca1720aba..09e15e69a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,4 +215,6 @@ seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
 seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java              from  https://github.com/lightbend/config
 seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java        from  https://github.com/lightbend/config
 seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java     from https://github.com/lightbend/config
-seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java     from https://github.com/lightbend/config
\ No newline at end of file
+seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/SimpleConfigObject.java     from https://github.com/lightbend/config
+generate_client_protocol.sh                                                                                                      from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java               from https://github.com/hazelcast/hazelcast
\ No newline at end of file
diff --git a/generate_client_protocol.sh b/generate_client_protocol.sh
new file mode 100755
index 000000000..2b26ecda3
--- /dev/null
+++ b/generate_client_protocol.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPT_DIR="$(dirname "$0")"
+SEATUNNEL_ENGINE_HOME="$(cd "$SCRIPT_DIR/"; pwd)"
+
+PYTHON="$(which python3 2>/dev/null)"
+PIP3="$(which pip3 2>/dev/null)"
+GIT="$(which git 2>/dev/null)"
+
+PROTOCOL_DIRECTORY=`mktemp -d 2>/dev/null || mktemp -d -t 'protocol'`
+
+if [ -z "$PYTHON" ]; then
+    echo "Python 3 could not be found in your system."
+    exit 1
+fi
+
+if [ -z "$PIP3" ]; then
+    echo "PIP 3 could not be found in your system."
+    exit 1
+fi
+
+if [ -z "$GIT" ]; then
+    echo "Git could not be found in your system."
+    exit 1
+fi
+
+echo $SCRIPT_DIR
+echo $SEATUNNEL_ENGINE_HOME
+echo $PROTOCOL_DIRECTORY
+
+$GIT clone --depth=1 https://github.com/hazelcast/hazelcast-client-protocol.git $PROTOCOL_DIRECTORY
+
+cd $PROTOCOL_DIRECTORY
+
+$PIP3 install -r requirements.txt
+
+$PYTHON generator.py -r $SEATUNNEL_ENGINE_HOME -p $SEATUNNEL_ENGINE_HOME/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition \
+-o seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
+-n org.apache.seatunnel.engine.core.protocol.codec --no-binary --no-id-check
+
+rm -rf $PROTOCOL_DIRECTORY
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4469c2d74..ab1634dec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,8 @@
         <springfox-swagger.version>2.6.1</springfox-swagger.version>
         <swagger-annotations.version>1.5.10</swagger-annotations.version>
         <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
+
+        <!--  SeaTunnel Engine use     -->
         <hazelcast.version>5.1</hazelcast.version>
     </properties>
 
@@ -925,12 +927,22 @@
                 <version>${hibernate.validator.version}</version>
             </dependency>
 
+            <!-- SeaTunnel engine use begin -->
             <dependency>
                 <groupId>com.hazelcast</groupId>
                 <artifactId>hazelcast</artifactId>
                 <version>${hazelcast.version}</version>
             </dependency>
 
+            <!-- test dependencies -->
+            <dependency>
+                <groupId>com.hazelcast</groupId>
+                <artifactId>hazelcast</artifactId>
+                <scope>test</scope>
+                <version>${hazelcast.version}</version>
+                <classifier>tests</classifier>
+            </dependency>
+            <!-- SeaTunnel engine use end -->
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
index 5749f4c3a..c68be96df 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/Transformation.java
@@ -15,15 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.api.transform;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
-    }
+public interface Transformation {
 }
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 1f15cbc08..d4f7aaffa 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -118,6 +118,27 @@
             </excludes>
             <outputDirectory>/lib</outputDirectory>
         </fileSet>
+        <!-- seatunnel engine -->
+        <fileSet>
+            <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
+            <includes>
+                <include>seatunnel-engine-*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+            </excludes>
+            <outputDirectory>/lib</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+            <includes>
+                <include>seatunnel-engine-*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+            </excludes>
+            <outputDirectory>/lib</outputDirectory>
+        </fileSet>
         <!-- connectors -->
         <fileSet>
             <directory>../seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib</directory>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index bd2841102..eddaf8129 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -118,10 +118,21 @@
             </excludes>
             <outputDirectory>/lib</outputDirectory>
         </fileSet>
+        <!-- seatunnel engine -->
         <fileSet>
-            <directory>../seatunnel-engine/target</directory>
+            <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
             <includes>
-                <include>seatunnel-engine*.jar</include>
+                <include>seatunnel-engine-*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+            </excludes>
+            <outputDirectory>/lib</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+            <includes>
+                <include>seatunnel-engine-*.jar</include>
             </includes>
             <excludes>
                 <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-engine/pom.xml b/seatunnel-engine/pom.xml
index f3064079c..d5efd7b5a 100644
--- a/seatunnel-engine/pom.xml
+++ b/seatunnel-engine/pom.xml
@@ -17,30 +17,18 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>seatunnel-engine</artifactId>
     <packaging>pom</packaging>
+    <artifactId>seatunnel-engine</artifactId>
 
-    <dependencies>
-
-    <dependency>
-        <groupId>com.hazelcast</groupId>
-        <artifactId>hazelcast</artifactId>
-    </dependency>
-
-    <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <scope>provided</scope>
-    </dependency>
-
-
-    </dependencies>
-
-
+    <modules>
+        <module>seatunnel-engine-client</module>
+        <module>seatunnel-engine-common</module>
+        <module>seatunnel-engine-server</module>
+        <module>seatunnel-engine-core</module>
+    </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
new file mode 100644
index 000000000..26c502f36
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-engine</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-engine-client</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
new file mode 100644
index 000000000..9cc0bfb61
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.api.transform.Transformation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JobExecutionEnvironment {
+
+    private List<Transformation> transformations;
+
+    private static String DEFAULT_JOB_NAME = "test_st_job";
+
+    private SeaTunnelClientConfig configuration;
+
+    private String jobName;
+
+    private int maxParallelism = 1;
+
+    public JobExecutionEnvironment(SeaTunnelClientConfig configuration) {
+        this.configuration = configuration;
+    }
+
+    public void addTransformation(Transformation transformation) {
+        if (transformations == null) {
+            transformations = new ArrayList<>();
+        }
+        this.transformations.add(transformation);
+    }
+
+    public List<Transformation> getTransformations() {
+        return transformations;
+    }
+
+    public void setTransformations(List<Transformation> transformations) {
+        this.transformations = transformations;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public void setMaxParallelism(int maxParallelism) {
+        this.maxParallelism = maxParallelism;
+    }
+}
+
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
new file mode 100644
index 000000000..652883a7d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.spi.impl.ClientInvocation;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.internal.serialization.SerializationService;
+import com.hazelcast.internal.util.Preconditions;
+import com.hazelcast.logging.ILogger;
+import lombok.NonNull;
+
+import java.util.UUID;
+import java.util.function.Function;
+
+public class SeaTunnelClient implements SeaTunnelClientInstance {
+    private final HazelcastClientInstanceImpl hazelcastClient;
+    private final SerializationService serializationService;
+
+    public SeaTunnelClient(@NonNull SeaTunnelClientConfig seaTunnelClientConfig) {
+        Preconditions.checkNotNull(seaTunnelClientConfig, "config");
+        this.hazelcastClient = ((HazelcastClientProxy) HazelcastClient.newHazelcastClient(seaTunnelClientConfig)).client;
+        this.serializationService = hazelcastClient.getSerializationService();
+        ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
+    }
+
+    @NonNull
+    @Override
+    public HazelcastInstance getHazelcastInstance() {
+        return hazelcastClient;
+    }
+
+    @Override
+    public JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config) {
+        // TODO analyze job config file and create LocalExecutionContext
+        return null;
+    }
+
+    public ILogger getLogger() {
+        return hazelcastClient.getLoggingService().getLogger(getClass());
+    }
+
+    private <S> S invokeRequestOnMasterAndDecodeResponse(ClientMessage request,
+                                                         Function<ClientMessage, Object> decoder) {
+        UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
+        return invokeRequestAndDecodeResponse(masterUuid, request, decoder);
+    }
+
+    private <S> S invokeRequestOnAnyMemberAndDecodeResponse(ClientMessage request,
+                                                            Function<ClientMessage, Object> decoder) {
+        return invokeRequestAndDecodeResponse(null, request, decoder);
+    }
+
+    private <S> S invokeRequestAndDecodeResponse(UUID uuid, ClientMessage request,
+                                                 Function<ClientMessage, Object> decoder) {
+        ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
+        try {
+            ClientMessage response = invocation.invoke().get();
+            return serializationService.toObject(decoder.apply(response));
+        } catch (Throwable t) {
+            throw ExceptionUtil.rethrow(t);
+        }
+    }
+
+    public String printMessageToMaster(@NonNull String msg) {
+        return invokeRequestOnMasterAndDecodeResponse(
+            SeaTunnelPrintMessageCodec.encodeRequest(msg),
+            response -> SeaTunnelPrintMessageCodec.decodeResponse(response)
+        );
+    }
+}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
similarity index 64%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
index 5749f4c3a..aa15c138a 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.client;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.client.config.ClientConfig;
 
-public class ServerStarter {
+public class SeaTunnelClientConfig extends ClientConfig {
 
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+    /**
+     * Creates a new config instance with default group name for SeaTunnel Engine
+     */
+    public SeaTunnelClientConfig() {
+        // TODO we should get cluster name from server config instead of return a constant name.
+        super();
+        setClusterName("SeaTunnel Engine");
     }
 }
+
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
similarity index 62%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 5749f4c3a..34ca898d9 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.client;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.NonNull;
 
-public class ServerStarter {
+public interface SeaTunnelClientInstance {
 
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
-    }
+    /**
+     * Returns the underlying Hazelcast IMDG instance used by SeaTunnel Engine Client. It will
+     * be a client, depending on the type of this
+     */
+    @NonNull
+    HazelcastInstance getHazelcastInstance();
+
+    JobExecutionEnvironment createExecutionContext(String filePath, SeaTunnelClientConfig config);
 }
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
new file mode 100644
index 000000000..0870fa08c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+@RunWith(JUnit4.class)
+public class SeaTunnelClientTest {
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        Config config = new Config();
+        config.getSecurityConfig().setEnabled(false);
+        config.getJetConfig().setEnabled(false);
+        config.getNetworkConfig().setPort(50001);
+        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
+    }
+
+    @Test
+    public void testSayHello() {
+        SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+        seaTunnelClientConfig.setClusterName("dev");
+        seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+
+        String msg = "Hello world";
+        String s = engineClient.printMessageToMaster(msg);
+        Assert.assertEquals(msg, s);
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
new file mode 100644
index 000000000..c62993d55
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-engine</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-engine-common</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 5749f4c3a..cb0795ee4 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -15,15 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
-    }
+public class Constant {
+    public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";
 }
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
similarity index 68%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 5749f4c3a..d6cd39dc6 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -15,15 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.config;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
-    }
+public class JobConfig {
 }
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
similarity index 64%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
index 5749f4c3a..1b4158886 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNotFoundExceptionSeaTunnel.java
@@ -15,15 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.exception;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+public class JobNotFoundExceptionSeaTunnel extends SeaTunnelEngineException {
+    public JobNotFoundExceptionSeaTunnel(long jobId) {
+        super("Job with id " + jobId + " not found");
+    }
 
-public class ServerStarter {
+    public JobNotFoundExceptionSeaTunnel(String message) {
+        super(message);
+    }
 
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+    public JobNotFoundExceptionSeaTunnel(String message, Throwable cause) {
+        super(message, cause);
     }
 }
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
similarity index 52%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
copy to seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
index 05822d331..b258b7db3 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineException.java
@@ -15,16 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.common.exception;
 
-import com.hazelcast.instance.impl.DefaultNodeContext;
-import com.hazelcast.instance.impl.Node;
-import com.hazelcast.instance.impl.NodeExtension;
+import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
+import com.hazelcast.core.HazelcastException;
 
-public class NodeContext extends DefaultNodeContext {
+public class SeaTunnelEngineException extends HazelcastException implements ClientExceptionFactory.ExceptionFactory {
+    public SeaTunnelEngineException() {
+    }
+
+    public SeaTunnelEngineException(String message) {
+        super(message);
+    }
+
+    public SeaTunnelEngineException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SeaTunnelEngineException(Throwable cause) {
+        super(cause);
+    }
 
     @Override
-    public NodeExtension createNodeExtension(Node node) {
-        return new org.apache.seatunnel.engine.server.NodeExtension(node);
+    public Throwable createException(String s, Throwable throwable) {
+        return new SeaTunnelEngineException(s, throwable);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
new file mode 100644
index 000000000..af9b504b7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import org.apache.seatunnel.engine.common.exception.JobNotFoundExceptionSeaTunnel;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+
+import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
+import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
+import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
+import lombok.NonNull;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+public final class ExceptionUtil {
+
+    private static final List<ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory>> EXCEPTIONS = Arrays.asList(
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START, SeaTunnelEngineException.class, SeaTunnelEngineException::new),
+        new ImmutableTriple<>(ClientProtocolErrorCodes.USER_EXCEPTIONS_RANGE_START + 1, JobNotFoundExceptionSeaTunnel.class, JobNotFoundExceptionSeaTunnel::new)
+    );
+
+    private ExceptionUtil() {
+    }
+
+    /**
+     * Called during startup to make our exceptions known to Hazelcast serialization
+     */
+    public static void registerSeaTunnelExceptions(@NonNull ClientExceptionFactory factory) {
+        for (ImmutableTriple<Integer, Class<? extends Throwable>, ClientExceptionFactory.ExceptionFactory> exception : EXCEPTIONS) {
+            factory.register(exception.left, exception.middle, exception.right);
+        }
+    }
+
+    @NonNull
+    public static RuntimeException rethrow(@NonNull final Throwable t) {
+        if (t instanceof Error) {
+            if (t instanceof OutOfMemoryError) {
+                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError) t);
+            }
+            throw (Error) t;
+        } else {
+            throw peeledAndUnchecked(t);
+        }
+    }
+
+    @NonNull
+    private static RuntimeException peeledAndUnchecked(@NonNull Throwable t) {
+        t = peel(t);
+
+        if (t instanceof RuntimeException) {
+            return (RuntimeException) t;
+        }
+
+        return new SeaTunnelEngineException(t);
+    }
+
+    /**
+     * If {@code t} is either of {@link CompletionException}, {@link ExecutionException}
+     * or {@link InvocationTargetException}, returns its cause, peeling it recursively.
+     * Otherwise returns {@code t}.
+     *
+     * @param t Throwable to peel
+     * @see #peeledAndUnchecked(Throwable)
+     */
+    public static Throwable peel(Throwable t) {
+        while ((t instanceof CompletionException
+            || t instanceof ExecutionException
+            || t instanceof InvocationTargetException)
+            && t.getCause() != null
+            && t.getCause() != t
+        ) {
+            t = t.getCause();
+        }
+        return t;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/pom.xml b/seatunnel-engine/seatunnel-engine-core/pom.xml
new file mode 100644
index 000000000..9a7e5cb51
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-engine</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-engine-core</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
new file mode 100644
index 000000000..75060fc90
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelPrintMessageCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+/**
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+@Generated("4d1cbb254e8eaad3c45fe22c57f29492")
+public final class SeaTunnelPrintMessageCodec {
+    //hex: 0xDE0100
+    public static final int REQUEST_MESSAGE_TYPE = 14549248;
+    //hex: 0xDE0101
+    public static final int RESPONSE_MESSAGE_TYPE = 14549249;
+    private static final int REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelPrintMessageCodec() {
+    }
+
+    public static ClientMessage encodeRequest(java.lang.String message) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(false);
+        clientMessage.setOperationName("SeaTunnel.PrintMessage");
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        clientMessage.add(initialFrame);
+        StringCodec.encode(clientMessage, message);
+        return clientMessage;
+    }
+
+    /**
+     *
+     */
+    public static java.lang.String decodeRequest(ClientMessage clientMessage) {
+        ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+        //empty initial frame
+        iterator.next();
+        return StringCodec.decode(iterator);
+    }
+
+    public static ClientMessage encodeResponse(java.lang.String response) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        StringCodec.encode(clientMessage, response);
+        return clientMessage;
+    }
+
+    /**
+     *
+     */
+    public static java.lang.String decodeResponse(ClientMessage clientMessage) {
+        ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
+        //empty initial frame
+        iterator.next();
+        return StringCodec.decode(iterator);
+    }
+}
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
similarity index 55%
copy from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
copy to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 972b00a88..a16ea482f 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -5,11 +6,36 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+# The schema of this file can find from this link: https://github.com/hazelcast/hazelcast-client-protocol
+id: 222
+name: SeaTunnel
+methods:
+  - id: 1
+    name: printMessage
+    since: 2.0
+    doc: ''
+    request:
+      retryable: false
+      partitionIdentifier: -1
+      params:
+        - name: message
+          type: String
+          nullable: false
+          since: 2.0
+          doc: ''
+    response:
+      params:
+        - name: response
+          type: String
+          nullable: false
+          since: 2.0
+          doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml
new file mode 100644
index 000000000..b193e438a
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-engine</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-engine-server</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-engine-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
similarity index 96%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index f583a91ef..1bd5a83d4 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -28,7 +28,7 @@ public class NodeExtension extends DefaultNodeExtension {
 
     public NodeExtension(Node node) {
         super(node);
-        extCommon = new NodeExtensionCommon(node, new Server(node));
+        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(node));
     }
 
     @Override
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
index 8c6ab6dae..45ecaa3a6 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.engine.server;
 
 import static com.hazelcast.cluster.ClusterState.PASSIVE;
 
+import org.apache.seatunnel.engine.common.Constant;
+
 import com.hazelcast.cluster.ClusterState;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.logging.ILogger;
@@ -41,9 +43,9 @@ class NodeExtensionCommon {
 
     private final Node node;
     private final ILogger logger;
-    private final Server server;
+    private final SeaTunnelServer server;
 
-    NodeExtensionCommon(Node node, Server server) {
+    NodeExtensionCommon(Node node, SeaTunnelServer server) {
         this.node = node;
         this.logger = node.getLogger(getClass().getName());
         this.server = server;
@@ -94,7 +96,7 @@ class NodeExtensionCommon {
     Map<String, Object> createExtensionServices() {
         Map<String, Object> extensionServices = new HashMap<>();
 
-        extensionServices.put(Server.SERVICE_NAME, server);
+        extensionServices.put(Constant.SEATUNNEL_SERVICE_NAME, server);
 
         return extensionServices;
     }
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 05822d331..9c112591f 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -21,7 +21,7 @@ import com.hazelcast.instance.impl.DefaultNodeContext;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.instance.impl.NodeExtension;
 
-public class NodeContext extends DefaultNodeContext {
+public class SeaTunnelNodeContext extends DefaultNodeContext {
 
     @Override
     public NodeExtension createNodeExtension(Node node) {
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
similarity index 94%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 90bbfc7c8..002b2558c 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -29,13 +29,13 @@ import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 
 import java.util.Properties;
 
-public class Server implements ManagedService, MembershipAwareService, LiveOperationsTracker {
+public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
     public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
 
     private NodeEngineImpl nodeEngine;
     private final ILogger logger;
 
-    public Server(Node node) {
+    public SeaTunnelServer(Node node) {
         this.logger = node.getLogger(getClass());
         logger.info("SeaTunnel server start...");
     }
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
similarity index 84%
copy from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 5749f4c3a..e40e5d727 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.engine.server;
 import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 
-public class ServerStarter {
+public class SeaTunnelServerStarter {
 
     public static void main(String[] args) {
         Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
+        config.getSecurityConfig().setEnabled(false);
+        config.getJetConfig().setEnabled(false);
+        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new SeaTunnelNodeContext());
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java
new file mode 100644
index 000000000..df633ec9f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/PrintMessageOperation.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class PrintMessageOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+    private String message;
+
+    private String response;
+
+    public PrintMessageOperation() {
+    }
+
+    public PrintMessageOperation(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public final int getFactoryId() {
+        return OperationDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return OperationDataSerializerHook.PRINT_MESSAGE_OPERATOR;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeString(message);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        message = in.readString();
+    }
+
+    @Override
+    public void run() {
+        SeaTunnelServer service = getService();
+        response = service.printMessage(message);
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
new file mode 100644
index 000000000..78dee775e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/PrintMessageTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+
+import com.google.common.base.Function;
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.exception.RetryableHazelcastException;
+import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.security.Permission;
+
+public class PrintMessageTask extends AbstractInvocationMessageTask<String> {
+
+    protected PrintMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
+        super(clientMessage, node, connection);
+    }
+
+    @Override
+    protected InvocationBuilder getInvocationBuilder(Operation op) {
+        Address masterAddress = nodeEngine.getMasterAddress();
+        if (masterAddress == null) {
+            throw new RetryableHazelcastException("master not yet known");
+        }
+        return nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+            op, masterAddress);
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new PrintMessageOperation(parameters);
+    }
+
+    @Override
+    protected String decodeClientMessage(ClientMessage clientMessage) {
+        Function<ClientMessage, String> decodeRequest = SeaTunnelPrintMessageCodec::decodeRequest;
+        return decodeRequest.apply(clientMessage);
+    }
+
+    @Override
+    protected ClientMessage encodeResponse(Object response) {
+        Function<String, ClientMessage> encodeResponse = SeaTunnelPrintMessageCodec::encodeResponse;
+        return encodeResponse.apply((String) response);
+    }
+
+    @Override
+    public String getServiceName() {
+        return Constant.SEATUNNEL_SERVICE_NAME;
+    }
+
+    @Override
+    public Permission getRequiredPermission() {
+        return null;
+    }
+
+    @Override
+    public String getDistributedObjectName() {
+        return null;
+    }
+
+    @Override
+    public String getMethodName() {
+        return "printMessage";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
new file mode 100644
index 000000000..9b25ecd32
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+
+import com.hazelcast.client.impl.protocol.MessageTaskFactory;
+import com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.util.collection.Int2ObjectHashMap;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryProvider {
+    private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);
+    public final Node node;
+
+    public SeaTunnelMessageTaskFactoryProvider(NodeEngine nodeEngine) {
+        this.node = ((NodeEngineImpl) nodeEngine).getNode();
+        initFactories();
+    }
+
+    @Override
+    public Int2ObjectHashMap<MessageTaskFactory> getFactories() {
+        return this.factories;
+    }
+
+    private void initFactories() {
+        factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new PrintMessageTask(clientMessage, node, connection));
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
new file mode 100644
index 000000000..807602605
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.serializable;
+
+import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class OperationDataSerializerHook implements DataSerializerHook {
+    public static final int PRINT_MESSAGE_OPERATOR = 0;
+
+    public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
+        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
+        OperationFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
+    );
+
+    @Override
+    public int getFactoryId() {
+        return FACTORY_ID;
+    }
+
+    @Override
+    public DataSerializableFactory createFactory() {
+        return new Factory();
+    }
+
+    private static class Factory implements DataSerializableFactory {
+        @SuppressWarnings("checkstyle:returncount")
+        @Override
+        public IdentifiedDataSerializable create(int typeId) {
+            switch (typeId) {
+                case PRINT_MESSAGE_OPERATOR:
+                    return new PrintMessageOperation();
+                default:
+                    throw new IllegalArgumentException("Unknown type id " + typeId);
+            }
+        }
+    }
+}
diff --git a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
similarity index 56%
rename from seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
index 5749f4c3a..e062bf67d 100644
--- a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationFactoryIdConstant.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.server.serializable;
 
-import com.hazelcast.config.Config;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-
-public class ServerStarter {
-
-    public static void main(String[] args) {
-        Config config = new Config();
-        HazelcastInstanceFactory.newHazelcastInstance(config, Thread.currentThread().getName(), new NodeContext());
-    }
+/**
+ * Constants used for Hazelcast's {@link com.hazelcast.nio.serialization.IdentifiedDataSerializable}
+ * mechanism.
+ */
+public class OperationFactoryIdConstant {
+    /** Name of the system property that specifies SeaTunnelEngine's data serialization factory ID. */
+    public static final String SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY = "hazelcast.serialization.ds.seatunnel.engine.operation";
+    /** Default ID of SeaTunnelEngine's data serialization factory. */
+    public static final int SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID = -30001;
 }
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
similarity index 84%
copy from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
copy to seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
index 972b00a88..ab0062e85 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.DataSerializerHook
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -5,11 +6,13 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook
diff --git a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
similarity index 84%
rename from seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
rename to seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
index 972b00a88..972834b5d 100644
--- a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.client.impl.protocol.MessageTaskFactoryProvider
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -5,11 +6,13 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.seatunnel.engine.server.NodeExtension
+#
+
+org.apache.seatunnel.engine.server.protocol.task.SeaTunnelMessageTaskFactoryProvider
diff --git a/tools/checkstyle/checkStyle.xml b/tools/checkstyle/checkStyle.xml
index ddff7c20f..bf6e419a4 100755
--- a/tools/checkstyle/checkStyle.xml
+++ b/tools/checkstyle/checkStyle.xml
@@ -539,4 +539,8 @@
                       value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF, CTOR_DEF"/>
         </module>
     </module>
+
+    <module name="SuppressionFilter">
+        <property name="file" value="tools/checkstyle/suppressions.xml" />
+    </module>
 </module>
diff --git a/tools/checkstyle/suppressions.xml b/tools/checkstyle/suppressions.xml
new file mode 100644
index 000000000..04495ba44
--- /dev/null
+++ b/tools/checkstyle/suppressions.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<!DOCTYPE suppressions PUBLIC
+        "-//Puppy Crawl//DTD Suppressions 1.1//EN"
+        "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+    <suppress files="[\\/]org/apache/seatunnel/engine/core/protocol/codec" checks="[a-zA-Z0-9]*"/>
+    <suppress files="[\\/]org/apache/seatunnel/engine/core/protocol/compatibility" checks="[a-zA-Z0-9]*"/>
+</suppressions>
\ No newline at end of file