You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/19 08:15:31 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector V2] expose configurable options in Cassandra (#3681)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 73f63a504 [Feature][Connector V2] expose configurable options in Cassandra (#3681)
73f63a504 is described below

commit 73f63a5044856907b08ea16fb3d821bc6534a5eb
Author: Cason-ACE <35...@users.noreply.github.com>
AuthorDate: Fri May 19 16:15:22 2023 +0800

    [Feature][Connector V2] expose configurable options in Cassandra (#3681)
---
 docs/en/connector-v2/sink/Cassandra.md             |  26 ++---
 .../cassandra/config/CassandraConfig.java          | 127 +++++++--------------
 .../cassandra/config/CassandraParameters.java      |  97 ++++++++++++++++
 .../seatunnel/cassandra/sink/CassandraSink.java    |  34 +++---
 .../cassandra/sink/CassandraSinkFactory.java       |  48 ++++++++
 .../cassandra/sink/CassandraSinkWriter.java        |  36 +++---
 .../cassandra/source/CassandraSource.java          |  29 ++---
 .../cassandra/source/CassandraSourceFactory.java   |  48 ++++++++
 .../cassandra/source/CassandraSourceReader.java    |  23 ++--
 .../seatunnel/cassandra/CassandraFactoryTest.java  |  33 ++++++
 10 files changed, 341 insertions(+), 160 deletions(-)

diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md
index 6c0736889..73c6d3aba 100644
--- a/docs/en/connector-v2/sink/Cassandra.md
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -12,19 +12,19 @@ Write data to Apache Cassandra.
 
 ## Options
 
-|       name        |  type  | required | default value |
-|-------------------|--------|----------|---------------|
-| host              | String | Yes      | -             |
-| keyspace          | String | Yes      | -             |
-| table             | String | Yes      | -             |
-| username          | String | No       | -             |
-| password          | String | No       | -             |
-| datacenter        | String | No       | datacenter1   |
-| consistency_level | String | No       | LOCAL_ONE     |
-| fields            | String | No       | LOCAL_ONE     |
-| batch_size        | String | No       | 5000          |
-| batch_type        | String | No       | UNLOGGER      |
-| async_write       | String | No       | true          |
+|       name        |  type   | required | default value |
+|-------------------|---------|----------|---------------|
+| host              | String  | Yes      | -             |
+| keyspace          | String  | Yes      | -             |
+| table             | String  | Yes      | -             |
+| username          | String  | No       | -             |
+| password          | String  | No       | -             |
+| datacenter        | String  | No       | datacenter1   |
+| consistency_level | String  | No       | LOCAL_ONE     |
+| fields            | String  | No       | LOCAL_ONE     |
+| batch_size        | int     | No       | 5000          |
+| batch_type        | String  | No       | UNLOGGED      |
+| async_write       | boolean | No       | true          |
 
 ### host [string]
 
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
index 46e2cf776..7f38dc27d 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
@@ -17,99 +17,50 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
-import com.datastax.oss.driver.api.core.ConsistencyLevel;
-import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
-import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-import lombok.ToString;
+public class CassandraConfig {
 
-import java.io.Serializable;
-import java.util.List;
+    public static final Integer DEFAULT_BATCH_SIZE = 5000;
 
-@Data
-@ToString
-@NoArgsConstructor
-public class CassandraConfig implements Serializable {
+    public static final Option<String> HOST =
+            Options.key("host").stringType().noDefaultValue().withDescription("");
 
-    public static final String HOST = "host";
-    public static final String USERNAME = "username";
-    public static final String PASSWORD = "password";
-    public static final String DATACENTER = "datacenter";
-    public static final String KEYSPACE = "keyspace";
-    public static final String TABLE = "table";
-    public static final String CQL = "cql";
-    public static final String FIELDS = "fields";
-    public static final String CONSISTENCY_LEVEL = "consistency_level";
-    public static final String BATCH_SIZE = "batch_size";
-    public static final String BATCH_TYPE = "batch_type";
-    public static final String ASYNC_WRITE = "async_write";
+    public static final Option<String> KEYSPACE =
+            Options.key("keyspace").stringType().noDefaultValue().withDescription("");
 
-    private String host;
-    private String username;
-    private String password;
-    private String datacenter;
-    private String keyspace;
-    private String table;
-    private String cql;
-    private List<String> fields;
-    private ConsistencyLevel consistencyLevel;
-    private Integer batchSize;
-    private DefaultBatchType batchType;
-    private Boolean asyncWrite;
+    public static final Option<String> USERNAME =
+            Options.key("username").stringType().noDefaultValue().withDescription("");
+    public static final Option<String> PASSWORD =
+            Options.key("password").stringType().noDefaultValue().withDescription("");
+    public static final Option<String> DATACENTER =
+            Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");
 
-    public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
-        this.host = host;
-        this.keyspace = keyspace;
-    }
+    public static final Option<String> CONSISTENCY_LEVEL =
+            Options.key("consistency_level")
+                    .stringType()
+                    .defaultValue("LOCAL_ONE")
+                    .withDescription("");
 
-    public static CassandraConfig getCassandraConfig(Config config) {
-        CassandraConfig cassandraConfig =
-                new CassandraConfig(config.getString(HOST), config.getString(KEYSPACE));
-        if (config.hasPath(USERNAME)) {
-            cassandraConfig.setUsername(config.getString(USERNAME));
-        }
-        if (config.hasPath(PASSWORD)) {
-            cassandraConfig.setPassword(config.getString(PASSWORD));
-        }
-        if (config.hasPath(DATACENTER)) {
-            cassandraConfig.setDatacenter(config.getString(DATACENTER));
-        } else {
-            cassandraConfig.setDatacenter("datacenter1");
-        }
-        if (config.hasPath(TABLE)) {
-            cassandraConfig.setTable(config.getString(TABLE));
-        }
-        if (config.hasPath(CQL)) {
-            cassandraConfig.setCql(config.getString(CQL));
-        }
-        if (config.hasPath(FIELDS)) {
-            cassandraConfig.setFields(config.getStringList(FIELDS));
-        }
-        if (config.hasPath(CONSISTENCY_LEVEL)) {
-            cassandraConfig.setConsistencyLevel(
-                    DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
-        } else {
-            cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
-        }
-        if (config.hasPath(BATCH_SIZE)) {
-            cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
-        } else {
-            cassandraConfig.setBatchSize(Integer.parseInt("5000"));
-        }
-        if (config.hasPath(BATCH_TYPE)) {
-            cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
-        } else {
-            cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
-        }
-        if (config.hasPath(ASYNC_WRITE)) {
-            cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
-        } else {
-            cassandraConfig.setAsyncWrite(true);
-        }
-        return cassandraConfig;
-    }
+    public static final Option<String> TABLE =
+            Options.key("table").stringType().noDefaultValue().withDescription("");
+
+    public static final Option<String> FIELDS =
+            Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(DEFAULT_BATCH_SIZE)
+                    .withDescription("");
+
+    public static final Option<String> BATCH_TYPE =
+            Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");
+
+    public static final Option<Boolean> ASYNC_WRITE =
+            Options.key("async_write").booleanType().defaultValue(true).withDescription("");
+
+    public static final Option<String> CQL =
+            Options.key("cql").stringType().noDefaultValue().withDescription("");
 }
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
new file mode 100644
index 000000000..14a66c6ee
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Setter
+@Getter
+public class CassandraParameters implements Serializable {
+    private String host;
+    private String username;
+    private String password;
+    private String datacenter;
+    private String keyspace;
+    private String table;
+    private String cql;
+    private List<String> fields;
+    private ConsistencyLevel consistencyLevel;
+    private Integer batchSize;
+    private DefaultBatchType batchType;
+    private Boolean asyncWrite;
+
+    public void buildWithConfig(Config config) {
+        this.host = config.getString(CassandraConfig.HOST.key());
+        this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());
+
+        if (config.hasPath(CassandraConfig.USERNAME.key())) {
+            this.username = config.getString(CassandraConfig.USERNAME.key());
+        }
+        if (config.hasPath(CassandraConfig.PASSWORD.key())) {
+            this.password = config.getString(CassandraConfig.PASSWORD.key());
+        }
+        if (config.hasPath(CassandraConfig.DATACENTER.key())) {
+            this.datacenter = config.getString(CassandraConfig.DATACENTER.key());
+        } else {
+            this.datacenter = CassandraConfig.DATACENTER.defaultValue();
+        }
+        if (config.hasPath(CassandraConfig.TABLE.key())) {
+            this.table = config.getString(CassandraConfig.TABLE.key());
+        }
+        if (config.hasPath(CassandraConfig.CQL.key())) {
+            this.cql = config.getString(CassandraConfig.CQL.key());
+        }
+        if (config.hasPath(CassandraConfig.FIELDS.key())) {
+            this.fields = config.getStringList(CassandraConfig.FIELDS.key());
+        }
+        if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
+            this.consistencyLevel =
+                    DefaultConsistencyLevel.valueOf(
+                            config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
+        } else {
+            this.consistencyLevel =
+                    DefaultConsistencyLevel.valueOf(
+                            CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
+        }
+        if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
+            this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
+        } else {
+            this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
+        }
+        if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
+            this.batchType =
+                    DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
+        } else {
+            this.batchType = DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
+        }
+        if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
+            this.asyncWrite = config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
+        } else {
+            this.asyncWrite = true;
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index c935c7a53..748280e66 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -51,8 +51,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.Cassand
 @AutoService(SeaTunnelSink.class)
 public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private CassandraConfig cassandraConfig;
-
+    private final CassandraParameters cassandraParameters = new CassandraParameters();
     private SeaTunnelRowType seaTunnelRowType;
 
     private ColumnDefinitions tableSchema;
@@ -63,8 +62,10 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult checkResult =
+                CheckConfigUtil.checkAllExists(
+                        pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
         if (!checkResult.isSuccess()) {
             throw new CassandraConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -72,23 +73,24 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SINK, checkResult.getMsg()));
         }
-        this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+        this.cassandraParameters.buildWithConfig(pluginConfig);
         try (CqlSession session =
                 CassandraClient.getCqlSessionBuilder(
-                                cassandraConfig.getHost(),
-                                cassandraConfig.getKeyspace(),
-                                cassandraConfig.getUsername(),
-                                cassandraConfig.getPassword(),
-                                cassandraConfig.getDatacenter())
+                                cassandraParameters.getHost(),
+                                cassandraParameters.getKeyspace(),
+                                cassandraParameters.getUsername(),
+                                cassandraParameters.getPassword(),
+                                cassandraParameters.getDatacenter())
                         .build()) {
-            List<String> fields = cassandraConfig.getFields();
-            this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable());
+            List<String> fields = cassandraParameters.getFields();
+            this.tableSchema =
+                    CassandraClient.getTableSchema(session, pluginConfig.getString(TABLE.key()));
             if (fields == null || fields.isEmpty()) {
                 List<String> newFields = new ArrayList<>();
                 for (int i = 0; i < tableSchema.size(); i++) {
                     newFields.add(tableSchema.get(i).getName().asInternal());
                 }
-                cassandraConfig.setFields(newFields);
+                this.cassandraParameters.setFields(newFields);
             } else {
                 for (String field : fields) {
                     if (!tableSchema.contains(field)) {
@@ -97,7 +99,7 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
                                 "Field "
                                         + field
                                         + " does not exist in table "
-                                        + config.getString(TABLE));
+                                        + pluginConfig.getString(TABLE.key()));
                     }
                 }
             }
@@ -123,6 +125,6 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
             throws IOException {
-        return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema);
+        return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
new file mode 100644
index 000000000..471847bbc
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class CassandraSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Cassandra";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, CassandraConfig.TABLE)
+                .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+                .optional(
+                        CassandraConfig.DATACENTER,
+                        CassandraConfig.CONSISTENCY_LEVEL,
+                        CassandraConfig.FIELDS,
+                        CassandraConfig.BATCH_SIZE,
+                        CassandraConfig.BATCH_TYPE,
+                        CassandraConfig.ASYNC_WRITE)
+                .build();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
index 6565909a2..394b2b46b 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Slf4j
 public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
-    private final CassandraConfig cassandraConfig;
+    private final CassandraParameters cassandraParameters;
     private final SeaTunnelRowType seaTunnelRowType;
     private final ColumnDefinitions tableSchema;
     private final CqlSession session;
@@ -57,21 +57,21 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
     private final AtomicInteger counter = new AtomicInteger(0);
 
     public CassandraSinkWriter(
-            CassandraConfig cassandraConfig,
+            CassandraParameters cassandraParameters,
             SeaTunnelRowType seaTunnelRowType,
             ColumnDefinitions tableSchema) {
-        this.cassandraConfig = cassandraConfig;
+        this.cassandraParameters = cassandraParameters;
         this.seaTunnelRowType = seaTunnelRowType;
         this.tableSchema = tableSchema;
         this.session =
                 CassandraClient.getCqlSessionBuilder(
-                                cassandraConfig.getHost(),
-                                cassandraConfig.getKeyspace(),
-                                cassandraConfig.getUsername(),
-                                cassandraConfig.getPassword(),
-                                cassandraConfig.getDatacenter())
+                                cassandraParameters.getHost(),
+                                cassandraParameters.getKeyspace(),
+                                cassandraParameters.getUsername(),
+                                cassandraParameters.getPassword(),
+                                cassandraParameters.getDatacenter())
                         .build();
-        this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build();
+        this.batchStatement = BatchStatement.builder(cassandraParameters.getBatchType()).build();
         this.boundStatementList = new ArrayList<>();
         this.completionStages = new ArrayList<>();
         this.preparedStatement = session.prepare(initPrepareCQL());
@@ -81,14 +81,14 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
     public void write(SeaTunnelRow row) throws IOException {
         BoundStatement boundStatement = this.preparedStatement.bind();
         addIntoBatch(row, boundStatement);
-        if (counter.getAndIncrement() >= cassandraConfig.getBatchSize()) {
+        if (counter.getAndIncrement() >= cassandraParameters.getBatchSize()) {
             flush();
             counter.set(0);
         }
     }
 
     private void flush() {
-        if (cassandraConfig.getAsyncWrite()) {
+        if (cassandraParameters.getAsyncWrite()) {
             completionStages.forEach(
                     resultStage ->
                             resultStage.whenComplete(
@@ -115,14 +115,14 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
 
     private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) {
         try {
-            for (int i = 0; i < cassandraConfig.getFields().size(); i++) {
-                String fieldName = cassandraConfig.getFields().get(i);
+            for (int i = 0; i < cassandraParameters.getFields().size(); i++) {
+                String fieldName = cassandraParameters.getFields().get(i);
                 DataType dataType = tableSchema.get(i).getType();
                 Object fieldValue = row.getField(seaTunnelRowType.indexOf(fieldName));
                 boundStatement =
                         TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue);
             }
-            if (cassandraConfig.getAsyncWrite()) {
+            if (cassandraParameters.getAsyncWrite()) {
                 completionStages.add(session.executeAsync(boundStatement));
             } else {
                 boundStatementList.add(boundStatement);
@@ -134,12 +134,12 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
     }
 
     private String initPrepareCQL() {
-        String[] placeholder = new String[cassandraConfig.getFields().size()];
+        String[] placeholder = new String[cassandraParameters.getFields().size()];
         Arrays.fill(placeholder, "?");
         return String.format(
                 "INSERT INTO %s (%s) VALUES (%s)",
-                cassandraConfig.getTable(),
-                String.join(",", cassandraConfig.getFields()),
+                cassandraParameters.getTable(),
+                String.join(",", cassandraParameters.getFields()),
                 String.join(",", placeholder));
     }
 
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
index 77ddd1f25..6a1885967 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
@@ -53,7 +53,7 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
 
     private SeaTunnelRowType rowTypeInfo;
-    private CassandraConfig cassandraConfig;
+    private final CassandraParameters cassandraParameters = new CassandraParameters();
 
     @Override
     public String getPluginName() {
@@ -61,8 +61,9 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, CQL);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult checkResult =
+                CheckConfigUtil.checkAllExists(pluginConfig, HOST.key(), KEYSPACE.key(), CQL.key());
         if (!checkResult.isSuccess()) {
             throw new CassandraConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -70,26 +71,26 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, checkResult.getMsg()));
         }
-        this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+        this.cassandraParameters.buildWithConfig(pluginConfig);
         try (CqlSession currentSession =
                 CassandraClient.getCqlSessionBuilder(
-                                cassandraConfig.getHost(),
-                                cassandraConfig.getKeyspace(),
-                                cassandraConfig.getUsername(),
-                                cassandraConfig.getPassword(),
-                                cassandraConfig.getDatacenter())
+                                pluginConfig.getString(HOST.key()),
+                                pluginConfig.getString(KEYSPACE.key()),
+                                cassandraParameters.getUsername(),
+                                cassandraParameters.getPassword(),
+                                cassandraParameters.getDatacenter())
                         .build()) {
             Row rs =
                     currentSession
                             .execute(
                                     CassandraClient.createSimpleStatement(
-                                            cassandraConfig.getCql(),
-                                            cassandraConfig.getConsistencyLevel()))
+                                            pluginConfig.getString(CQL.key()),
+                                            cassandraParameters.getConsistencyLevel()))
                             .one();
             if (rs == null) {
                 throw new CassandraConnectorException(
                         CassandraConnectorErrorCode.NO_DATA_IN_SOURCE_TABLE,
-                        "No data select from this cql: " + cassandraConfig.getCql());
+                        "No data select from this cql: " + pluginConfig.getConfig(CQL.key()));
             }
             int columnSize = rs.getColumnDefinitions().size();
             String[] fieldNames = new String[columnSize];
@@ -121,6 +122,6 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(
             SingleSplitReaderContext readerContext) throws Exception {
-        return new CassandraSourceReader(cassandraConfig, readerContext);
+        return new CassandraSourceReader(cassandraParameters, readerContext);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
new file mode 100644
index 000000000..9c1541115
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class CassandraSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Cassandra";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, CassandraConfig.CQL)
+                .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+                .optional(CassandraConfig.DATACENTER, CassandraConfig.CONSISTENCY_LEVEL)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return CassandraSource.class;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
index 2de5d661c..6e4917ca9 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
@@ -33,12 +33,13 @@ import java.io.IOException;
 
 @Slf4j
 public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-    private final CassandraConfig cassandraConfig;
+    private final CassandraParameters cassandraParameters;
     private final SingleSplitReaderContext readerContext;
     private CqlSession session;
 
-    CassandraSourceReader(CassandraConfig cassandraConfig, SingleSplitReaderContext readerContext) {
-        this.cassandraConfig = cassandraConfig;
+    CassandraSourceReader(
+            CassandraParameters cassandraParameters, SingleSplitReaderContext readerContext) {
+        this.cassandraParameters = cassandraParameters;
         this.readerContext = readerContext;
     }
 
@@ -46,11 +47,11 @@ public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRo
     public void open() throws Exception {
         session =
                 CassandraClient.getCqlSessionBuilder(
-                                cassandraConfig.getHost(),
-                                cassandraConfig.getKeyspace(),
-                                cassandraConfig.getUsername(),
-                                cassandraConfig.getPassword(),
-                                cassandraConfig.getDatacenter())
+                                cassandraParameters.getHost(),
+                                cassandraParameters.getKeyspace(),
+                                cassandraParameters.getUsername(),
+                                cassandraParameters.getPassword(),
+                                cassandraParameters.getDatacenter())
                         .build();
     }
 
@@ -67,8 +68,8 @@ public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRo
             ResultSet resultSet =
                     session.execute(
                             CassandraClient.createSimpleStatement(
-                                    cassandraConfig.getCql(),
-                                    cassandraConfig.getConsistencyLevel()));
+                                    cassandraParameters.getCql(),
+                                    cassandraParameters.getConsistencyLevel()));
             resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row)));
         } finally {
             this.readerContext.signalNoMoreElement();
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java b/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java
new file mode 100644
index 000000000..3a07a7edb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra;
+
+import org.apache.seatunnel.connectors.seatunnel.cassandra.sink.CassandraSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.source.CassandraSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class CassandraFactoryTest {
+
+    @Test
+    void optionRule() {
+        Assertions.assertNotNull((new CassandraSourceFactory()).optionRule());
+        Assertions.assertNotNull((new CassandraSinkFactory()).optionRule());
+    }
+}