You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "liugddx (via GitHub)" <gi...@apache.org> on 2023/04/19 11:49:33 UTC

[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

liugddx commented on code in PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#discussion_r1171219494


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the type. e.g.
+ *
+ * <pre>
+ * schema = {
+ *    fields {
+ *      name = "string"
+ *      age = "int"
+ *    }
+ * }
+ * </pre>
+ *
+ * <p>Right now the data type of kafka is SeaTunnelType, so we don't need to convert the data type.
+ */
+@AutoService(DataTypeConvertor.class)
+public class KafkaDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> {
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return CatalogTableUtil.parseDataType(connectorDataType);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            SeaTunnelDataType<?> connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return seaTunnelDataType;
+    }
+
+    @Override
+    public String getIdentity() {
+        return "Kafka";

Review Comment:
   Maybe we need a constant class to manage these names uniformly.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This is a KafkaCatalog implementation.
+ *
+ * <p>In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class);
+    private final String catalogName;
+    private final String bootstrapServers;
+    private final String defaultTopic;
+
+    private AdminClient adminClient;
+
+    public KafkaCatalog(String catalogName, String defaultTopic, String bootstrapServers) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+        this.bootstrapServers = checkNotNull(bootstrapServers, "bootstrapServers cannot be null");
+        this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        adminClient = AdminClient.create(properties);
+        try {
+            TopicDescription topicDescription = getTopicDescription(defaultTopic);
+            if (topicDescription == null) {
+                throw new DatabaseNotExistException(catalogName, defaultTopic);
+            }
+            LOGGER.info(
+                    "Catalog {} is established connection to {}, the default database is {}",
+                    catalogName,
+                    bootstrapServers,
+                    topicDescription.name());
+        } catch (DatabaseNotExistException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s establish connection to %s error",
+                            catalogName, bootstrapServers),
+                    e);

Review Comment:
   Can throw Exception directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org