You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "EricJoy2048 (via GitHub)" <gi...@apache.org> on 2023/04/13 10:44:58 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 opened a new pull request, #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

EricJoy2048 opened a new pull request, #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567

   Reverts apache/incubator-seatunnel#4477


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] ic4y commented on pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4567:
URL: https://github.com/apache/seatunnel/pull/4567#issuecomment-1666845886

   @EricJoy2048 Does this pr still need to be merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#issuecomment-1514140193

   Wait CI/CD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "liugddx (via GitHub)" <gi...@apache.org>.
liugddx commented on PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#issuecomment-1514586697

   Why does the sonar bot appear randomly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "liugddx (via GitHub)" <gi...@apache.org>.
liugddx commented on PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#issuecomment-1514594229

   Does the database and tables in the Kafka catalog refer to topics?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "liugddx (via GitHub)" <gi...@apache.org>.
liugddx commented on code in PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#discussion_r1171219494


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The data type convertor of Kafka, only fields defined in schema has the type. e.g.
+ *
+ * <pre>
+ * schema = {
+ *    fields {
+ *      name = "string"
+ *      age = "int"
+ *    }
+ * }
+ * </pre>
+ *
+ * <p>Right now the data type of kafka is SeaTunnelType, so we don't need to convert the data type.
+ */
+@AutoService(DataTypeConvertor.class)
+public class KafkaDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> {
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        return CatalogTableUtil.parseDataType(connectorDataType);
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            SeaTunnelDataType<?> connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return connectorDataType;
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        return seaTunnelDataType;
+    }
+
+    @Override
+    public String getIdentity() {
+        return "Kafka";

Review Comment:
   Maybe we need a constant class to manage these names uniformly.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This is a KafkaCatalog implementation.
+ *
+ * <p>In kafka the database and table both are the topic name.
+ */
+public class KafkaCatalog implements Catalog {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class);
+    private final String catalogName;
+    private final String bootstrapServers;
+    private final String defaultTopic;
+
+    private AdminClient adminClient;
+
+    public KafkaCatalog(String catalogName, String defaultTopic, String bootstrapServers) {
+        this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
+        this.bootstrapServers = checkNotNull(bootstrapServers, "bootstrapServers cannot be null");
+        this.defaultTopic = checkNotNull(defaultTopic, "defaultTopic cannot be null");
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        adminClient = AdminClient.create(properties);
+        try {
+            TopicDescription topicDescription = getTopicDescription(defaultTopic);
+            if (topicDescription == null) {
+                throw new DatabaseNotExistException(catalogName, defaultTopic);
+            }
+            LOGGER.info(
+                    "Catalog {} is established connection to {}, the default database is {}",
+                    catalogName,
+                    bootstrapServers,
+                    topicDescription.name());
+        } catch (DatabaseNotExistException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Catalog : %s establish connection to %s error",
+                            catalogName, bootstrapServers),
+                    e);

Review Comment:
   Can throw Exception directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] sonarcloud[bot] commented on pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"

Posted by "sonarcloud[bot] (via GitHub)" <gi...@apache.org>.
sonarcloud[bot] commented on PR #4567:
URL: https://github.com/apache/incubator-seatunnel/pull/4567#issuecomment-1506778729

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache_incubator-seatunnel&pullRequest=4567)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=BUG) [![C](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/C-16px.png 'C')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=BUG) [6 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-seatunnel&pullRequest=4567&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache_incubator-seatunnel&pullRequest=4567&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache_incubator-seatunnel&pullRequest=4567&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache_incubator-seatunnel&pullRequest=4567&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-seatunnel&pullRequest=4567&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Revert "[chore] delete unavailable S3 & Kafka Catalogs" [seatunnel]

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 closed pull request #4567: Revert "[chore] delete unavailable S3 & Kafka Catalogs"
URL: https://github.com/apache/seatunnel/pull/4567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org