You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/13 10:44:55 UTC
[incubator-seatunnel] 01/01: Revert "[chore] delete unavailable S3 & Kafka Catalogs (#4477)"
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch revert-4477-delete-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 5f106e2cdb56ea59417b3e19a2975a3da9eff9b2
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Apr 13 18:44:45 2023 +0800
Revert "[chore] delete unavailable S3 & Kafka Catalogs (#4477)"
This reverts commit e0aec5ecec3dac6d0b1f32b0feab763d36322aca.
---
.../seatunnel/file/s3/catalog/S3Catalog.java | 200 ++++++++++++++++
.../file/s3/catalog/S3CatalogFactory.java | 48 ++++
.../file/s3/catalog/S3DataTypeConvertor.java | 63 +++++
.../seatunnel/kafka/catalog/KafkaCatalog.java | 265 +++++++++++++++++++++
.../kafka/catalog/KafkaCatalogFactory.java | 43 ++++
.../kafka/catalog/KafkaDataTypeConvertor.java | 72 ++++++
.../kafka/catalog/KafkaDataTypeConvertorTest.java | 46 ++++
7 files changed, 737 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
new file mode 100644
index 000000000..816597f05
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.seatunnel.shade.hadoop.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * S3 catalog implementation.
+ *
+ * <p>The given path directory is the database/table.
+ */
+public class S3Catalog implements Catalog {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3Catalog.class);
+
+ private final String catalogName;
+ private final Config s3Config;
+
+ private String defaultDatabase;
+ private FileSystem fileSystem;
+
+ public S3Catalog(String catalogName, Config s3Config) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+ this.s3Config = checkNotNull(s3Config, "s3Config cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ ReadStrategy readStrategy =
+ ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
+ readStrategy.setPluginConfig(s3Config);
+ this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
+ readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
+ readStrategy.setPluginConfig(s3Config);
+ try {
+ fileSystem =
+ FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
+ } catch (IOException e) {
+ throw new CatalogException("Open S3Catalog failed", e);
+ }
+ LOGGER.info("S3Catalog {} is opened", catalogName);
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ LOGGER.info("S3Catalog {} is closed", catalogName);
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ // check if the directory exists
+ try {
+ return fileSystem
+ .getFileStatus(new org.apache.hadoop.fs.Path(databaseName))
+ .isDirectory();
+ } catch (FileNotFoundException e) {
+ LOGGER.debug("Database {} does not exist", databaseName, e);
+ return false;
+ } catch (Exception ex) {
+ throw new CatalogException("Check database exists failed", ex);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ // todo: Do we need to find all sub directory as database?
+ if (databaseExists(defaultDatabase)) {
+ return Lists.newArrayList(defaultDatabase);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (databaseExists(databaseName)) {
+ return Lists.newArrayList(databaseName);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ return databaseExists(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+ // todo:
+ TableSchema tableSchema = TableSchema.builder().build();
+ return CatalogTable.of(
+ tableIdentifier, tableSchema, Collections.emptyMap(), Collections.emptyList(), "");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ createDatabase(tablePath, ignoreIfExists);
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getTableName()), true);
+ } catch (IOException e) {
+ throw new CatalogException("Drop table failed", e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ // todo: Do we need to set schema?
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.create(new org.apache.hadoop.fs.Path(tablePath.getTableName()));
+ } catch (FileAlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName, tablePath, e);
+ }
+ } catch (IOException e) {
+ throw new CatalogException(
+ String.format(
+ "Create table %s at catalog %s failed",
+ tablePath.getTableName(), catalogName),
+ e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ try {
+ fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getDatabaseName()), true);
+ } catch (IOException e) {
+ throw new CatalogException(
+ String.format("Drop database: %s failed", tablePath.getDatabaseName()), e);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
new file mode 100644
index 000000000..569f070e6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+public class S3CatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // todo:
+ Config config = ConfigFactory.parseMap(options.toMap());
+ return new S3Catalog(catalogName, config);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.S3.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return null;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
new file mode 100644
index 000000000..93f175600
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class S3DataTypeConvertor implements DataTypeConvertor<SeaTunnelRowType> {
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ return CatalogTableUtil.parseDataType(connectorDataType);
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ SeaTunnelRowType connectorDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ return connectorDataType;
+ }
+
+ @Override
+ public SeaTunnelRowType toConnectorType(
+ SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ // transform SeaTunnelDataType to SeaTunnelRowType
+ if (!(seaTunnelDataType instanceof SeaTunnelRowType)) {
+ throw DataTypeConvertException.convertToConnectorDataTypeException(seaTunnelDataType);
+ }
+ return (SeaTunnelRowType) seaTunnelDataType;
+ }
+
+ @Override
+ public String getIdentity() {
+ return "S3";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
new file mode 100644
index 000000000..d23890d47
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This is a KafkaCatalog implementation.
+ *
+ * <p>In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class);
+ private final String catalogName;
+ private final String bootstrapServers;
+ private final String defaultTopic;
+
+ private AdminClient adminClient;
+
+ public KafkaCatalog(String catalogName, String defaultTopic, String bootstrapServers) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+ this.bootstrapServers = checkNotNull(bootstrapServers, "bootstrapServers cannot be null");
+ this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ adminClient = AdminClient.create(properties);
+ try {
+ TopicDescription topicDescription = getTopicDescription(defaultTopic);
+ if (topicDescription == null) {
+ throw new DatabaseNotExistException(catalogName, defaultTopic);
+ }
+ LOGGER.info(
+ "Catalog {} is established connection to {}, the default database is {}",
+ catalogName,
+ bootstrapServers,
+ topicDescription.name());
+ } catch (DatabaseNotExistException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Catalog : %s establish connection to %s error",
+ catalogName, bootstrapServers),
+ e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ adminClient.close();
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultTopic;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ checkNotNull(databaseName, "databaseName cannot be null");
+ try {
+ TopicDescription topicDescription = getTopicDescription(databaseName);
+ return topicDescription != null;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Catalog : %s check database : %s exists error",
+ catalogName, databaseName),
+ e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
+ Set<String> topics = listTopicsResult.names().get();
+ return Lists.newArrayList(topics);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CatalogException(
+ String.format("Listing database in catalog %s error", catalogName), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ return Lists.newArrayList(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ return databaseExists(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ TopicDescription topicDescription;
+ try {
+ topicDescription = getTopicDescription(tablePath.getTableName());
+ if (topicDescription == null) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new CatalogException(
+ String.format("Catalog : %s get table : %s error", catalogName, tablePath), e);
+ }
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+ // todo: Set the schema of the table?
+ TableSchema tableSchema = TableSchema.builder().build();
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchema,
+ buildConnectorOptions(topicDescription),
+ Collections.emptyList(),
+ "");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ if (tableExists(tablePath)) {
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+ Map<String, String> options = table.getOptions();
+ int partitionNumber = Integer.parseInt(options.get(Config.PARTITION.key()));
+ short replicationFactor = Short.parseShort(options.get(Config.REPLICATION_FACTOR));
+ NewTopic newTopic =
+ new NewTopic(tablePath.getTableName(), partitionNumber, replicationFactor);
+ CreateTopicsResult createTopicsResult =
+ adminClient.createTopics(Lists.newArrayList(newTopic));
+ try {
+ createTopicsResult.all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new CatalogException(
+ String.format(
+ "Catalog : %s create table : %s error",
+ catalogName, tablePath.getFullName()),
+ e);
+ }
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ DeleteTopicsResult deleteTopicsResult =
+ adminClient.deleteTopics(Lists.newArrayList(tablePath.getTableName()));
+ try {
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CatalogException(
+ String.format(
+ "Catalog : %s drop table : %s error",
+ catalogName, tablePath.getFullName()),
+ e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ // todo: We cannot create topic here, since we don't know the partition number and
+ // replication factor.
+ throw new UnsupportedOperationException("Kafka catalog does not support create database");
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ // todo:
+ dropTable(tablePath, ignoreIfNotExists);
+ }
+
+ private TopicDescription getTopicDescription(String topicName)
+ throws ExecutionException, InterruptedException {
+ DescribeTopicsResult describeTopicsResult =
+ adminClient.describeTopics(Lists.newArrayList(topicName));
+ KafkaFuture<TopicDescription> topicDescriptionKafkaFuture =
+ describeTopicsResult.topicNameValues().get(topicName);
+ return topicDescriptionKafkaFuture.get();
+ }
+
+ private Map<String, String> buildConnectorOptions(TopicDescription topicDescription) {
+ String topicName = topicDescription.name();
+ List<TopicPartitionInfo> partitions = topicDescription.partitions();
+ List<Node> replicas = partitions.get(0).replicas();
+ // todo: Do we need to support partition has different replication factor?
+ Map<String, String> options = new HashMap<>();
+ options.put(Config.BOOTSTRAP_SERVERS.key(), bootstrapServers);
+ options.put(Config.TOPIC.key(), topicName);
+ options.put(Config.PARTITION.key(), String.valueOf(partitions.size()));
+ options.put(Config.REPLICATION_FACTOR, String.valueOf(replicas.size()));
+ return options;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
new file mode 100644
index 000000000..a30e459d0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+
+public class KafkaCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // todo: Do we need to use singleton here?
+ return null;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ // todo:
+ return null;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
new file mode 100644
index 000000000..20b4801ca
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the type. e.g.
+ *
+ * <pre>
+ * schema = {
+ * fields {
+ * name = "string"
+ * age = "int"
+ * }
+ * }
+ * </pre>
+ *
+ * <p>Right now the data type of kafka is SeaTunnelType, so we don't need to convert the data type.
+ */
+@AutoService(DataTypeConvertor.class)
+public class KafkaDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> {
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ return CatalogTableUtil.parseDataType(connectorDataType);
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ SeaTunnelDataType<?> connectorDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ return connectorDataType;
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toConnectorType(
+ SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ return seaTunnelDataType;
+ }
+
+ @Override
+ public String getIdentity() {
+ return "Kafka";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
new file mode 100644
index 000000000..c8f60025b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+class KafkaDataTypeConvertorTest {
+
+ private final KafkaDataTypeConvertor kafkaDataTypeConvertor = new KafkaDataTypeConvertor();
+
+ @Test
+ void toSeaTunnelType() {
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE,
+ kafkaDataTypeConvertor.toConnectorType(
+ BasicType.STRING_TYPE, Collections.emptyMap()));
+ }
+
+ @Test
+ void toConnectorType() {
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE,
+ kafkaDataTypeConvertor.toConnectorType(
+ BasicType.STRING_TYPE, Collections.emptyMap()));
+ }
+}