You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/13 10:44:55 UTC

[incubator-seatunnel] 01/01: Revert "[chore] delete unavailable S3 & Kafka Catalogs (#4477)"

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch revert-4477-delete-catalog
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 5f106e2cdb56ea59417b3e19a2975a3da9eff9b2
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Apr 13 18:44:45 2023 +0800

    Revert "[chore] delete unavailable S3 & Kafka Catalogs (#4477)"
    
    This reverts commit e0aec5ecec3dac6d0b1f32b0feab763d36322aca.
---
 .../seatunnel/file/s3/catalog/S3Catalog.java       | 200 ++++++++++++++++
 .../file/s3/catalog/S3CatalogFactory.java          |  48 ++++
 .../file/s3/catalog/S3DataTypeConvertor.java       |  63 +++++
 .../seatunnel/kafka/catalog/KafkaCatalog.java      | 265 +++++++++++++++++++++
 .../kafka/catalog/KafkaCatalogFactory.java         |  43 ++++
 .../kafka/catalog/KafkaDataTypeConvertor.java      |  72 ++++++
 .../kafka/catalog/KafkaDataTypeConvertorTest.java  |  46 ++++
 7 files changed, 737 insertions(+)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
new file mode 100644
index 000000000..816597f05
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.seatunnel.shade.hadoop.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * S3 catalog implementation.
+ *
+ * <p>The given path directory is the database/table.
+ */
+public class S3Catalog implements Catalog {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(S3Catalog.class);
+
+    private final String catalogName;
+    private final Config s3Config;
+
+    private String defaultDatabase;
+    private FileSystem fileSystem;
+
+    public S3Catalog(String catalogName, Config s3Config) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+        this.s3Config = checkNotNull(s3Config, "s3Config cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        ReadStrategy readStrategy =
+                ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
+        readStrategy.setPluginConfig(s3Config);
+        this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
+        readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
+        readStrategy.setPluginConfig(s3Config);
+        try {
+            fileSystem =
+                    FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
+        } catch (IOException e) {
+            throw new CatalogException("Open S3Catalog failed", e);
+        }
+        LOGGER.info("S3Catalog {} is opened", catalogName);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOGGER.info("S3Catalog {} is closed", catalogName);
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultDatabase;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        // check if the directory exists
+        try {
+            return fileSystem
+                    .getFileStatus(new org.apache.hadoop.fs.Path(databaseName))
+                    .isDirectory();
+        } catch (FileNotFoundException e) {
+            LOGGER.debug("Database {} does not exist", databaseName, e);
+            return false;
+        } catch (Exception ex) {
+            throw new CatalogException("Check database exists failed", ex);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        // todo: Do we need to find all sub directory as database?
+        if (databaseExists(defaultDatabase)) {
+            return Lists.newArrayList(defaultDatabase);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (databaseExists(databaseName)) {
+            return Lists.newArrayList(databaseName);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        return databaseExists(tablePath.getTableName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(
+                        catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+        // todo:
+        TableSchema tableSchema = TableSchema.builder().build();
+        return CatalogTable.of(
+                tableIdentifier, tableSchema, Collections.emptyMap(), Collections.emptyList(), "");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        createDatabase(tablePath, ignoreIfExists);
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getTableName()), true);
+        } catch (IOException e) {
+            throw new CatalogException("Drop table failed", e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        // todo: Do we need to set schema?
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.create(new org.apache.hadoop.fs.Path(tablePath.getTableName()));
+        } catch (FileAlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(catalogName, tablePath, e);
+            }
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format(
+                            "Create table %s at catalog %s failed",
+                            tablePath.getTableName(), catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        try {
+            fileSystem.delete(new org.apache.hadoop.fs.Path(tablePath.getDatabaseName()), true);
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Drop database: %s failed", tablePath.getDatabaseName()), e);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
new file mode 100644
index 000000000..569f070e6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3CatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+
+public class S3CatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // todo:
+        Config config = ConfigFactory.parseMap(options.toMap());
+        return new S3Catalog(catalogName, config);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.S3.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return null;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
new file mode 100644
index 000000000..93f175600
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class S3DataTypeConvertor implements DataTypeConvertor<SeaTunnelRowType> {
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return CatalogTableUtil.parseDataType(connectorDataType);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            SeaTunnelRowType connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelRowType toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        // transform SeaTunnelDataType to SeaTunnelRowType
+        if (!(seaTunnelDataType instanceof SeaTunnelRowType)) {
+            throw DataTypeConvertException.convertToConnectorDataTypeException(seaTunnelDataType);
+        }
+        return (SeaTunnelRowType) seaTunnelDataType;
+    }
+
+    @Override
+    public String getIdentity() {
+        return "S3";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
new file mode 100644
index 000000000..d23890d47
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This is a KafkaCatalog implementation.
+ *
+ * <p>In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class);
+    private final String catalogName;
+    private final String bootstrapServers;
+    private final String defaultTopic;
+
+    private AdminClient adminClient;
+
+    public KafkaCatalog(String catalogName, String defaultTopic, String bootstrapServers) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+        this.bootstrapServers = checkNotNull(bootstrapServers, "bootstrapServers cannot be null");
+        this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        adminClient = AdminClient.create(properties);
+        try {
+            TopicDescription topicDescription = getTopicDescription(defaultTopic);
+            if (topicDescription == null) {
+                throw new DatabaseNotExistException(catalogName, defaultTopic);
+            }
+            LOGGER.info(
+                    "Catalog {} is established connection to {}, the default database is {}",
+                    catalogName,
+                    bootstrapServers,
+                    topicDescription.name());
+        } catch (DatabaseNotExistException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s establish connection to %s error",
+                            catalogName, bootstrapServers),
+                    e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        adminClient.close();
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return defaultTopic;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        checkNotNull(databaseName, "databaseName cannot be null");
+        try {
+            TopicDescription topicDescription = getTopicDescription(databaseName);
+            return topicDescription != null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s check database : %s exists error",
+                            catalogName, databaseName),
+                    e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            ListTopicsResult listTopicsResult = adminClient.listTopics();
+            Set<String> topics = listTopicsResult.names().get();
+            return Lists.newArrayList(topics);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CatalogException(
+                    String.format("Listing database in catalog %s error", catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        }
+        return Lists.newArrayList(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        return databaseExists(tablePath.getDatabaseName());
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        TopicDescription topicDescription;
+        try {
+            topicDescription = getTopicDescription(tablePath.getTableName());
+            if (topicDescription == null) {
+                throw new TableNotExistException(catalogName, tablePath);
+            }
+        } catch (ExecutionException | InterruptedException e) {
+            throw new CatalogException(
+                    String.format("Catalog : %s get table : %s error", catalogName, tablePath), e);
+        }
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(
+                        catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
+        // todo: Set the schema of the table?
+        TableSchema tableSchema = TableSchema.builder().build();
+        return CatalogTable.of(
+                tableIdentifier,
+                tableSchema,
+                buildConnectorOptions(topicDescription),
+                Collections.emptyList(),
+                "");
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        checkNotNull(tablePath, "tablePath cannot be null");
+        if (tableExists(tablePath)) {
+            throw new TableAlreadyExistException(catalogName, tablePath);
+        }
+        Map<String, String> options = table.getOptions();
+        int partitionNumber = Integer.parseInt(options.get(Config.PARTITION.key()));
+        short replicationFactor = Short.parseShort(options.get(Config.REPLICATION_FACTOR));
+        NewTopic newTopic =
+                new NewTopic(tablePath.getTableName(), partitionNumber, replicationFactor);
+        CreateTopicsResult createTopicsResult =
+                adminClient.createTopics(Lists.newArrayList(newTopic));
+        try {
+            createTopicsResult.all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s create table : %s error",
+                            catalogName, tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        DeleteTopicsResult deleteTopicsResult =
+                adminClient.deleteTopics(Lists.newArrayList(tablePath.getTableName()));
+        try {
+            deleteTopicsResult.all().get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s drop table : %s error",
+                            catalogName, tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        // todo: We cannot create topic here, since we don't know the partition number and
+        // replication factor.
+        throw new UnsupportedOperationException("Kafka catalog does not support create database");
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        // todo:
+        dropTable(tablePath, ignoreIfNotExists);
+    }
+
+    private TopicDescription getTopicDescription(String topicName)
+            throws ExecutionException, InterruptedException {
+        DescribeTopicsResult describeTopicsResult =
+                adminClient.describeTopics(Lists.newArrayList(topicName));
+        KafkaFuture<TopicDescription> topicDescriptionKafkaFuture =
+                describeTopicsResult.topicNameValues().get(topicName);
+        return topicDescriptionKafkaFuture.get();
+    }
+
+    private Map<String, String> buildConnectorOptions(TopicDescription topicDescription) {
+        String topicName = topicDescription.name();
+        List<TopicPartitionInfo> partitions = topicDescription.partitions();
+        List<Node> replicas = partitions.get(0).replicas();
+        // todo: Do we need to support partition has different replication factor?
+        Map<String, String> options = new HashMap<>();
+        options.put(Config.BOOTSTRAP_SERVERS.key(), bootstrapServers);
+        options.put(Config.TOPIC.key(), topicName);
+        options.put(Config.PARTITION.key(), String.valueOf(partitions.size()));
+        options.put(Config.REPLICATION_FACTOR, String.valueOf(replicas.size()));
+        return options;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
new file mode 100644
index 000000000..a30e459d0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalogFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+
+public class KafkaCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        // todo: Do we need to use singleton here?
+        return null;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // todo:
+        return null;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
new file mode 100644
index 000000000..20b4801ca
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the type. e.g.
+ *
+ * <pre>
+ * schema = {
+ *    fields {
+ *      name = "string"
+ *      age = "int"
+ *    }
+ * }
+ * </pre>
+ *
+ * <p>Right now the data type of kafka is SeaTunnelType, so we don't need to convert the data type.
+ */
+@AutoService(DataTypeConvertor.class)
+public class KafkaDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> {
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return CatalogTableUtil.parseDataType(connectorDataType);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            SeaTunnelDataType<?> connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return seaTunnelDataType;
+    }
+
+    @Override
+    public String getIdentity() {
+        return "Kafka";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
new file mode 100644
index 000000000..c8f60025b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+class KafkaDataTypeConvertorTest {
+
+    private final KafkaDataTypeConvertor kafkaDataTypeConvertor = new KafkaDataTypeConvertor();
+
+    @Test
+    void toSeaTunnelType() {
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE,
+                kafkaDataTypeConvertor.toConnectorType(
+                        BasicType.STRING_TYPE, Collections.emptyMap()));
+    }
+
+    @Test
+    void toConnectorType() {
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE,
+                kafkaDataTypeConvertor.toConnectorType(
+                        BasicType.STRING_TYPE, Collections.emptyMap()));
+    }
+}