You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/19 08:15:31 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector V2] expose configurable options in Cassandra (#3681)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 73f63a504 [Feature][Connector V2] expose configurable options in Cassandra (#3681)
73f63a504 is described below
commit 73f63a5044856907b08ea16fb3d821bc6534a5eb
Author: Cason-ACE <35...@users.noreply.github.com>
AuthorDate: Fri May 19 16:15:22 2023 +0800
[Feature][Connector V2] expose configurable options in Cassandra (#3681)
---
docs/en/connector-v2/sink/Cassandra.md | 26 ++---
.../cassandra/config/CassandraConfig.java | 127 +++++++--------------
.../cassandra/config/CassandraParameters.java | 97 ++++++++++++++++
.../seatunnel/cassandra/sink/CassandraSink.java | 34 +++---
.../cassandra/sink/CassandraSinkFactory.java | 48 ++++++++
.../cassandra/sink/CassandraSinkWriter.java | 36 +++---
.../cassandra/source/CassandraSource.java | 29 ++---
.../cassandra/source/CassandraSourceFactory.java | 48 ++++++++
.../cassandra/source/CassandraSourceReader.java | 23 ++--
.../seatunnel/cassandra/CassandraFactoryTest.java | 33 ++++++
10 files changed, 341 insertions(+), 160 deletions(-)
diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md
index 6c0736889..73c6d3aba 100644
--- a/docs/en/connector-v2/sink/Cassandra.md
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -12,19 +12,19 @@ Write data to Apache Cassandra.
## Options
-| name | type | required | default value |
-|-------------------|--------|----------|---------------|
-| host | String | Yes | - |
-| keyspace | String | Yes | - |
-| table | String | Yes | - |
-| username | String | No | - |
-| password | String | No | - |
-| datacenter | String | No | datacenter1 |
-| consistency_level | String | No | LOCAL_ONE |
-| fields | String | No | LOCAL_ONE |
-| batch_size | String | No | 5000 |
-| batch_type | String | No | UNLOGGER |
-| async_write | String | No | true |
+| name | type | required | default value |
+|-------------------|---------|----------|---------------|
+| host | String | Yes | - |
+| keyspace | String | Yes | - |
+| table | String | Yes | - |
+| username | String | No | - |
+| password | String | No | - |
+| datacenter | String | No | datacenter1 |
+| consistency_level | String | No | LOCAL_ONE |
+| fields | String | No | LOCAL_ONE |
+| batch_size | int | No | 5000 |
+| batch_type | String | No | UNLOGGED |
+| async_write | boolean | No | true |
### host [string]
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
index 46e2cf776..7f38dc27d 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
@@ -17,99 +17,50 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
-import com.datastax.oss.driver.api.core.ConsistencyLevel;
-import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
-import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-import lombok.ToString;
+public class CassandraConfig {
-import java.io.Serializable;
-import java.util.List;
+ public static final Integer DEFAULT_BATCH_SIZE = 5000;
-@Data
-@ToString
-@NoArgsConstructor
-public class CassandraConfig implements Serializable {
+ public static final Option<String> HOST =
+ Options.key("host").stringType().noDefaultValue().withDescription("");
- public static final String HOST = "host";
- public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
- public static final String DATACENTER = "datacenter";
- public static final String KEYSPACE = "keyspace";
- public static final String TABLE = "table";
- public static final String CQL = "cql";
- public static final String FIELDS = "fields";
- public static final String CONSISTENCY_LEVEL = "consistency_level";
- public static final String BATCH_SIZE = "batch_size";
- public static final String BATCH_TYPE = "batch_type";
- public static final String ASYNC_WRITE = "async_write";
+ public static final Option<String> KEYSPACE =
+ Options.key("keyspace").stringType().noDefaultValue().withDescription("");
- private String host;
- private String username;
- private String password;
- private String datacenter;
- private String keyspace;
- private String table;
- private String cql;
- private List<String> fields;
- private ConsistencyLevel consistencyLevel;
- private Integer batchSize;
- private DefaultBatchType batchType;
- private Boolean asyncWrite;
+ public static final Option<String> USERNAME =
+ Options.key("username").stringType().noDefaultValue().withDescription("");
+ public static final Option<String> PASSWORD =
+ Options.key("password").stringType().noDefaultValue().withDescription("");
+ public static final Option<String> DATACENTER =
+ Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");
- public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
- this.host = host;
- this.keyspace = keyspace;
- }
+ public static final Option<String> CONSISTENCY_LEVEL =
+ Options.key("consistency_level")
+ .stringType()
+ .defaultValue("LOCAL_ONE")
+ .withDescription("");
- public static CassandraConfig getCassandraConfig(Config config) {
- CassandraConfig cassandraConfig =
- new CassandraConfig(config.getString(HOST), config.getString(KEYSPACE));
- if (config.hasPath(USERNAME)) {
- cassandraConfig.setUsername(config.getString(USERNAME));
- }
- if (config.hasPath(PASSWORD)) {
- cassandraConfig.setPassword(config.getString(PASSWORD));
- }
- if (config.hasPath(DATACENTER)) {
- cassandraConfig.setDatacenter(config.getString(DATACENTER));
- } else {
- cassandraConfig.setDatacenter("datacenter1");
- }
- if (config.hasPath(TABLE)) {
- cassandraConfig.setTable(config.getString(TABLE));
- }
- if (config.hasPath(CQL)) {
- cassandraConfig.setCql(config.getString(CQL));
- }
- if (config.hasPath(FIELDS)) {
- cassandraConfig.setFields(config.getStringList(FIELDS));
- }
- if (config.hasPath(CONSISTENCY_LEVEL)) {
- cassandraConfig.setConsistencyLevel(
- DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
- } else {
- cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
- }
- if (config.hasPath(BATCH_SIZE)) {
- cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
- } else {
- cassandraConfig.setBatchSize(Integer.parseInt("5000"));
- }
- if (config.hasPath(BATCH_TYPE)) {
- cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
- } else {
- cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
- }
- if (config.hasPath(ASYNC_WRITE)) {
- cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
- } else {
- cassandraConfig.setAsyncWrite(true);
- }
- return cassandraConfig;
- }
+ public static final Option<String> TABLE =
+ Options.key("table").stringType().noDefaultValue().withDescription("");
+
+ public static final Option<String> FIELDS =
+ Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(DEFAULT_BATCH_SIZE)
+ .withDescription("");
+
+ public static final Option<String> BATCH_TYPE =
+ Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");
+
+ public static final Option<Boolean> ASYNC_WRITE =
+ Options.key("async_write").booleanType().defaultValue(true).withDescription("");
+
+ public static final Option<String> CQL =
+ Options.key("cql").stringType().noDefaultValue().withDescription("");
}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
new file mode 100644
index 000000000..14a66c6ee
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Setter
+@Getter
+public class CassandraParameters implements Serializable {
+ private String host;
+ private String username;
+ private String password;
+ private String datacenter;
+ private String keyspace;
+ private String table;
+ private String cql;
+ private List<String> fields;
+ private ConsistencyLevel consistencyLevel;
+ private Integer batchSize;
+ private DefaultBatchType batchType;
+ private Boolean asyncWrite;
+
+ public void buildWithConfig(Config config) {
+ this.host = config.getString(CassandraConfig.HOST.key());
+ this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());
+
+ if (config.hasPath(CassandraConfig.USERNAME.key())) {
+ this.username = config.getString(CassandraConfig.USERNAME.key());
+ }
+ if (config.hasPath(CassandraConfig.PASSWORD.key())) {
+ this.password = config.getString(CassandraConfig.PASSWORD.key());
+ }
+ if (config.hasPath(CassandraConfig.DATACENTER.key())) {
+ this.datacenter = config.getString(CassandraConfig.DATACENTER.key());
+ } else {
+ this.datacenter = CassandraConfig.DATACENTER.defaultValue();
+ }
+ if (config.hasPath(CassandraConfig.TABLE.key())) {
+ this.table = config.getString(CassandraConfig.TABLE.key());
+ }
+ if (config.hasPath(CassandraConfig.CQL.key())) {
+ this.cql = config.getString(CassandraConfig.CQL.key());
+ }
+ if (config.hasPath(CassandraConfig.FIELDS.key())) {
+ this.fields = config.getStringList(CassandraConfig.FIELDS.key());
+ }
+ if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
+ this.consistencyLevel =
+ DefaultConsistencyLevel.valueOf(
+ config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
+ } else {
+ this.consistencyLevel =
+ DefaultConsistencyLevel.valueOf(
+ CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
+ }
+ if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
+ this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
+ } else {
+ this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
+ }
+ if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
+ this.batchType =
+ DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
+ } else {
+ this.batchType = DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
+ }
+ if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
+ this.asyncWrite = config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
+ } else {
+ this.asyncWrite = true;
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index c935c7a53..748280e66 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -51,8 +51,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.Cassand
@AutoService(SeaTunnelSink.class)
public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private CassandraConfig cassandraConfig;
-
+ private final CassandraParameters cassandraParameters = new CassandraParameters();
private SeaTunnelRowType seaTunnelRowType;
private ColumnDefinitions tableSchema;
@@ -63,8 +62,10 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE);
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult checkResult =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
if (!checkResult.isSuccess()) {
throw new CassandraConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -72,23 +73,24 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
- this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+ this.cassandraParameters.buildWithConfig(pluginConfig);
try (CqlSession session =
CassandraClient.getCqlSessionBuilder(
- cassandraConfig.getHost(),
- cassandraConfig.getKeyspace(),
- cassandraConfig.getUsername(),
- cassandraConfig.getPassword(),
- cassandraConfig.getDatacenter())
+ cassandraParameters.getHost(),
+ cassandraParameters.getKeyspace(),
+ cassandraParameters.getUsername(),
+ cassandraParameters.getPassword(),
+ cassandraParameters.getDatacenter())
.build()) {
- List<String> fields = cassandraConfig.getFields();
- this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable());
+ List<String> fields = cassandraParameters.getFields();
+ this.tableSchema =
+ CassandraClient.getTableSchema(session, pluginConfig.getString(TABLE.key()));
if (fields == null || fields.isEmpty()) {
List<String> newFields = new ArrayList<>();
for (int i = 0; i < tableSchema.size(); i++) {
newFields.add(tableSchema.get(i).getName().asInternal());
}
- cassandraConfig.setFields(newFields);
+ this.cassandraParameters.setFields(newFields);
} else {
for (String field : fields) {
if (!tableSchema.contains(field)) {
@@ -97,7 +99,7 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
"Field "
+ field
+ " does not exist in table "
- + config.getString(TABLE));
+ + pluginConfig.getString(TABLE.key()));
}
}
}
@@ -123,6 +125,6 @@ public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
- return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema);
+ return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
}
}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
new file mode 100644
index 000000000..471847bbc
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class CassandraSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Cassandra";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, CassandraConfig.TABLE)
+ .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+ .optional(
+ CassandraConfig.DATACENTER,
+ CassandraConfig.CONSISTENCY_LEVEL,
+ CassandraConfig.FIELDS,
+ CassandraConfig.BATCH_SIZE,
+ CassandraConfig.BATCH_TYPE,
+ CassandraConfig.ASYNC_WRITE)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
index 6565909a2..394b2b46b 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private final CassandraConfig cassandraConfig;
+ private final CassandraParameters cassandraParameters;
private final SeaTunnelRowType seaTunnelRowType;
private final ColumnDefinitions tableSchema;
private final CqlSession session;
@@ -57,21 +57,21 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
private final AtomicInteger counter = new AtomicInteger(0);
public CassandraSinkWriter(
- CassandraConfig cassandraConfig,
+ CassandraParameters cassandraParameters,
SeaTunnelRowType seaTunnelRowType,
ColumnDefinitions tableSchema) {
- this.cassandraConfig = cassandraConfig;
+ this.cassandraParameters = cassandraParameters;
this.seaTunnelRowType = seaTunnelRowType;
this.tableSchema = tableSchema;
this.session =
CassandraClient.getCqlSessionBuilder(
- cassandraConfig.getHost(),
- cassandraConfig.getKeyspace(),
- cassandraConfig.getUsername(),
- cassandraConfig.getPassword(),
- cassandraConfig.getDatacenter())
+ cassandraParameters.getHost(),
+ cassandraParameters.getKeyspace(),
+ cassandraParameters.getUsername(),
+ cassandraParameters.getPassword(),
+ cassandraParameters.getDatacenter())
.build();
- this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build();
+ this.batchStatement = BatchStatement.builder(cassandraParameters.getBatchType()).build();
this.boundStatementList = new ArrayList<>();
this.completionStages = new ArrayList<>();
this.preparedStatement = session.prepare(initPrepareCQL());
@@ -81,14 +81,14 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
public void write(SeaTunnelRow row) throws IOException {
BoundStatement boundStatement = this.preparedStatement.bind();
addIntoBatch(row, boundStatement);
- if (counter.getAndIncrement() >= cassandraConfig.getBatchSize()) {
+ if (counter.getAndIncrement() >= cassandraParameters.getBatchSize()) {
flush();
counter.set(0);
}
}
private void flush() {
- if (cassandraConfig.getAsyncWrite()) {
+ if (cassandraParameters.getAsyncWrite()) {
completionStages.forEach(
resultStage ->
resultStage.whenComplete(
@@ -115,14 +115,14 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) {
try {
- for (int i = 0; i < cassandraConfig.getFields().size(); i++) {
- String fieldName = cassandraConfig.getFields().get(i);
+ for (int i = 0; i < cassandraParameters.getFields().size(); i++) {
+ String fieldName = cassandraParameters.getFields().get(i);
DataType dataType = tableSchema.get(i).getType();
Object fieldValue = row.getField(seaTunnelRowType.indexOf(fieldName));
boundStatement =
TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue);
}
- if (cassandraConfig.getAsyncWrite()) {
+ if (cassandraParameters.getAsyncWrite()) {
completionStages.add(session.executeAsync(boundStatement));
} else {
boundStatementList.add(boundStatement);
@@ -134,12 +134,12 @@ public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
}
private String initPrepareCQL() {
- String[] placeholder = new String[cassandraConfig.getFields().size()];
+ String[] placeholder = new String[cassandraParameters.getFields().size()];
Arrays.fill(placeholder, "?");
return String.format(
"INSERT INTO %s (%s) VALUES (%s)",
- cassandraConfig.getTable(),
- String.join(",", cassandraConfig.getFields()),
+ cassandraParameters.getTable(),
+ String.join(",", cassandraParameters.getFields()),
String.join(",", placeholder));
}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
index 77ddd1f25..6a1885967 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
@@ -53,7 +53,7 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
private SeaTunnelRowType rowTypeInfo;
- private CassandraConfig cassandraConfig;
+ private final CassandraParameters cassandraParameters = new CassandraParameters();
@Override
public String getPluginName() {
@@ -61,8 +61,9 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, CQL);
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult checkResult =
+ CheckConfigUtil.checkAllExists(pluginConfig, HOST.key(), KEYSPACE.key(), CQL.key());
if (!checkResult.isSuccess()) {
throw new CassandraConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -70,26 +71,26 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, checkResult.getMsg()));
}
- this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+ this.cassandraParameters.buildWithConfig(pluginConfig);
try (CqlSession currentSession =
CassandraClient.getCqlSessionBuilder(
- cassandraConfig.getHost(),
- cassandraConfig.getKeyspace(),
- cassandraConfig.getUsername(),
- cassandraConfig.getPassword(),
- cassandraConfig.getDatacenter())
+ pluginConfig.getString(HOST.key()),
+ pluginConfig.getString(KEYSPACE.key()),
+ cassandraParameters.getUsername(),
+ cassandraParameters.getPassword(),
+ cassandraParameters.getDatacenter())
.build()) {
Row rs =
currentSession
.execute(
CassandraClient.createSimpleStatement(
- cassandraConfig.getCql(),
- cassandraConfig.getConsistencyLevel()))
+ pluginConfig.getString(CQL.key()),
+ cassandraParameters.getConsistencyLevel()))
.one();
if (rs == null) {
throw new CassandraConnectorException(
CassandraConnectorErrorCode.NO_DATA_IN_SOURCE_TABLE,
- "No data select from this cql: " + cassandraConfig.getCql());
+ "No data select from this cql: " + pluginConfig.getConfig(CQL.key()));
}
int columnSize = rs.getColumnDefinitions().size();
String[] fieldNames = new String[columnSize];
@@ -121,6 +122,6 @@ public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
- return new CassandraSourceReader(cassandraConfig, readerContext);
+ return new CassandraSourceReader(cassandraParameters, readerContext);
}
}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
new file mode 100644
index 000000000..9c1541115
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class CassandraSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Cassandra";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, CassandraConfig.CQL)
+ .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+ .optional(CassandraConfig.DATACENTER, CassandraConfig.CONSISTENCY_LEVEL)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return CassandraSource.class;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
index 2de5d661c..6e4917ca9 100644
--- a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
-import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
@@ -33,12 +33,13 @@ import java.io.IOException;
@Slf4j
public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
- private final CassandraConfig cassandraConfig;
+ private final CassandraParameters cassandraParameters;
private final SingleSplitReaderContext readerContext;
private CqlSession session;
- CassandraSourceReader(CassandraConfig cassandraConfig, SingleSplitReaderContext readerContext) {
- this.cassandraConfig = cassandraConfig;
+ CassandraSourceReader(
+ CassandraParameters cassandraParameters, SingleSplitReaderContext readerContext) {
+ this.cassandraParameters = cassandraParameters;
this.readerContext = readerContext;
}
@@ -46,11 +47,11 @@ public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRo
public void open() throws Exception {
session =
CassandraClient.getCqlSessionBuilder(
- cassandraConfig.getHost(),
- cassandraConfig.getKeyspace(),
- cassandraConfig.getUsername(),
- cassandraConfig.getPassword(),
- cassandraConfig.getDatacenter())
+ cassandraParameters.getHost(),
+ cassandraParameters.getKeyspace(),
+ cassandraParameters.getUsername(),
+ cassandraParameters.getPassword(),
+ cassandraParameters.getDatacenter())
.build();
}
@@ -67,8 +68,8 @@ public class CassandraSourceReader extends AbstractSingleSplitReader<SeaTunnelRo
ResultSet resultSet =
session.execute(
CassandraClient.createSimpleStatement(
- cassandraConfig.getCql(),
- cassandraConfig.getConsistencyLevel()));
+ cassandraParameters.getCql(),
+ cassandraParameters.getConsistencyLevel()));
resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row)));
} finally {
this.readerContext.signalNoMoreElement();
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java b/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java
new file mode 100644
index 000000000..3a07a7edb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra;
+
+import org.apache.seatunnel.connectors.seatunnel.cassandra.sink.CassandraSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.source.CassandraSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class CassandraFactoryTest {
+
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new CassandraSourceFactory()).optionRule());
+ Assertions.assertNotNull((new CassandraSinkFactory()).optionRule());
+ }
+}