You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/07/05 23:43:31 UTC

[GitHub] [incubator-seatunnel] 2013650523 opened a new pull request, #2137: Api draft

2013650523 opened a new pull request, #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137

   add kudu source and sink connector


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#discussion_r914357631


##########
seatunnel-connectors-v2/connector-kudu/pom.xml:
##########
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.1.1-SNAPSHOT</version>

Review Comment:
   Replace `2.1.1-SNAPSHOT` to `${revision}`



##########
pom.xml:
##########
@@ -91,6 +91,7 @@
         <module>seatunnel-plugin-discovery</module>
         <module>seatunnel-formats</module>
         <module>seatunnel-dist</module>
+        <module>seatunnel-connectors-v2/connector-kudu</module>

Review Comment:
   Should you delete this line and add it to` seatunnel-connectors-v2/pom.xml` ?



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java:
##########
@@ -0,0 +1,161 @@
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.table.type.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(KuduInputFormat.class);
+
+    public KuduInputFormat(String kuduMaster,String tableName,String columnsList){
+        this.kuduMaster=kuduMaster;
+        this.columnsList=Arrays.asList(columnsList.split(","));
+        this.tableName=tableName;
+    }
+    /**
+     * Declare the global variable KuduClient and use it to manipulate the Kudu table
+     */
+    public KuduClient kuduClient;
+
+    /**
+     * Specify kuduMaster address
+     */
+    public   String kuduMaster;
+    public   List<String> columnsList;
+    public Schema schema;
+    public String keyColumn;
+
+    /**
+     * Specifies the name of the table
+     */
+    public   String tableName;
+    public List<ColumnSchema>  getColumnsSchemas(){
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();
+        List<ColumnSchema> columns = null;
+        try {
+            schema = kuduClient.openTable(tableName).getSchema();
+            keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+            columns =schema.getColumns();
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        return columns;
+    }
+
+    public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+
+                if (null == rs.getObject(i)) {
+                    seatunnelField = null;
+                } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getBoolean(i);
+                } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getByte(i);
+                } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getShort(i);
+                } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getInt(i);
+                } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getLong(i);
+                } else if (seaTunnelDataType instanceof DecimalType) {
+                    Object value = rs.getObject(i);
+                    seatunnelField = value instanceof BigInteger ?
+                            new BigDecimal((BigInteger) value, 0)
+                            : value;
+                } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getFloat(i);
+                } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getDouble(i);
+                } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getString(i);
+                } else {
+                    throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+                }
+                fields.add(seatunnelField);
+            }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+    // get SeaTunnelRowType
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+        } catch (Exception e) {
+            logger .warn("get row type info exception", e);
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    public void openInputFormat() {
+
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();

Review Comment:
   I found that you assigned values to `kuduClient` in several places. I'm not sure whether `kuduClient` can be changed to a local variable rather than a global variable. Or should it be a singleton?



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java:
##########
@@ -0,0 +1,161 @@
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.table.type.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(KuduInputFormat.class);
+
+    public KuduInputFormat(String kuduMaster,String tableName,String columnsList){
+        this.kuduMaster=kuduMaster;
+        this.columnsList=Arrays.asList(columnsList.split(","));
+        this.tableName=tableName;
+    }
+    /**
+     * Declare the global variable KuduClient and use it to manipulate the Kudu table
+     */
+    public KuduClient kuduClient;
+
+    /**
+     * Specify kuduMaster address
+     */
+    public   String kuduMaster;
+    public   List<String> columnsList;
+    public Schema schema;
+    public String keyColumn;
+
+    /**
+     * Specifies the name of the table
+     */
+    public   String tableName;
+    public List<ColumnSchema>  getColumnsSchemas(){
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();
+        List<ColumnSchema> columns = null;
+        try {
+            schema = kuduClient.openTable(tableName).getSchema();
+            keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+            columns =schema.getColumns();
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        return columns;
+    }
+
+    public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+
+                if (null == rs.getObject(i)) {
+                    seatunnelField = null;
+                } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getBoolean(i);
+                } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getByte(i);
+                } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getShort(i);
+                } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getInt(i);
+                } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getLong(i);
+                } else if (seaTunnelDataType instanceof DecimalType) {
+                    Object value = rs.getObject(i);
+                    seatunnelField = value instanceof BigInteger ?
+                            new BigDecimal((BigInteger) value, 0)
+                            : value;
+                } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getFloat(i);
+                } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getDouble(i);
+                } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getString(i);
+                } else {
+                    throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+                }
+                fields.add(seatunnelField);
+            }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+    // get SeaTunnelRowType
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+        } catch (Exception e) {
+            logger .warn("get row type info exception", e);
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    public void openInputFormat() {
+
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();
+
+        logger.info("服务器地址#{}:客户端#{} 初始化成功...", kuduMaster, kuduClient);

Review Comment:
   Update this to English please.



##########
seatunnel-connectors-v2/connector-kudu/pom.xml:
##########
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.1.1-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-kudu</artifactId>
+
+    <properties>

Review Comment:
   Please remove `properties` , the connector should use the parent module's `properties`.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java:
##########
@@ -0,0 +1,161 @@
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.table.type.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(KuduInputFormat.class);
+
+    public KuduInputFormat(String kuduMaster,String tableName,String columnsList){
+        this.kuduMaster=kuduMaster;
+        this.columnsList=Arrays.asList(columnsList.split(","));
+        this.tableName=tableName;
+    }
+    /**
+     * Declare the global variable KuduClient and use it to manipulate the Kudu table
+     */
+    public KuduClient kuduClient;
+
+    /**
+     * Specify kuduMaster address
+     */
+    public   String kuduMaster;
+    public   List<String> columnsList;
+    public Schema schema;
+    public String keyColumn;
+
+    /**
+     * Specifies the name of the table
+     */
+    public   String tableName;
+    public List<ColumnSchema>  getColumnsSchemas(){
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();
+        List<ColumnSchema> columns = null;
+        try {
+            schema = kuduClient.openTable(tableName).getSchema();
+            keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+            columns =schema.getColumns();
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        return columns;
+    }
+
+    public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+
+                if (null == rs.getObject(i)) {
+                    seatunnelField = null;
+                } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getBoolean(i);
+                } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getByte(i);
+                } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getShort(i);
+                } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getInt(i);
+                } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getLong(i);
+                } else if (seaTunnelDataType instanceof DecimalType) {
+                    Object value = rs.getObject(i);
+                    seatunnelField = value instanceof BigInteger ?
+                            new BigDecimal((BigInteger) value, 0)
+                            : value;
+                } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getFloat(i);
+                } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getDouble(i);
+                } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                    seatunnelField = rs.getString(i);
+                } else {
+                    throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+                }
+                fields.add(seatunnelField);
+            }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+    // get SeaTunnelRowType
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+        } catch (Exception e) {
+            logger .warn("get row type info exception", e);
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    public void openInputFormat() {
+
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        kuduClient = kuduClientBuilder.build();
+
+        logger.info("服务器地址#{}:客户端#{} 初始化成功...", kuduMaster, kuduClient);
+    }
+
+
+    /**
+     *
+     * @param lowerBound The beginning of each slice
+     * @param upperBound  End of each slice
+     * @return  Get the kuduScanner object for each slice
+     */
+    public KuduScanner getKuduBuildSplit(int lowerBound,int upperBound){
+        KuduScanner kuduScanner = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
+                    schema.getColumn(""+keyColumn),
+                    KuduPredicate.ComparisonOp.GREATER_EQUAL,
+                    lowerBound);
+
+            KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
+                    schema.getColumn(""+keyColumn),
+                    KuduPredicate.ComparisonOp.LESS,
+                    upperBound);
+
+             kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
+                    .addPredicate(upperPred).build();
+        } catch (KuduException e) {
+            e.printStackTrace();

Review Comment:
   It may be better to use logger instead of `e.printStackTrace()`



##########
seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java:
##########
@@ -39,7 +39,7 @@ public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType) {
     @Override
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public void write(SeaTunnelRow element) {
-        System.out.println(Arrays.toString(element.getFields()));
+       System.out.println(Arrays.toString(element.getFields()));

Review Comment:
   Every line of data will call `write` once. So `System.out.println` will output the same number of rows of the data.
   Please remove this line.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSinkState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+    private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType rowTypeInfo;
+    private KuduInputFormat kuduInputFormat;
+    private PartitionParameter partitionParameter;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return  this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+        return new KuduSourceReader(kuduInputFormat,readerContext);
+    }
+
+    @Override
+    public Serializer<KuduSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> createEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> restoreEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSinkState checkpointState) {
+        // todo:
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public Serializer<KuduSinkState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "KuduSource";
+    }
+
+    @Override
+    public void prepare(Config config) {
+
+        String kudumaster = config.getString(KuduSourceConfig.kuduMaster);
+        String tableName = config.getString(KuduSourceConfig.tableName);
+
+        String columnslist = config.getString(KuduSourceConfig.columnsList);
+
+
+        kuduInputFormat=new KuduInputFormat(kudumaster,tableName,columnslist);
+        try {
+            KuduClient.KuduClientBuilder kuduClientBuilder = new
+                    KuduClient.KuduClientBuilder(kudumaster);
+            kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+            KuduClient kuduClient = kuduClientBuilder.build();
+            partitionParameter = initPartitionParameter(kuduClient,tableName);
+            SeaTunnelRowType seaTunnelRowType =getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+
+
+            rowTypeInfo=seaTunnelRowType;
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private PartitionParameter initPartitionParameter(KuduClient kuduClient,String tableName) {
+        String keyColumn = null;
+        ArrayList<Integer> keyList = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+            ArrayList<String> columnsList = new ArrayList<String>();
+            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+            columnsList.add(""+keyColumn);
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduScanner kuduScanner = kuduScannerBuilder.build();
+            keyList = new ArrayList<>();
+            while (kuduScanner.hasMoreRows()) {
+                RowResultIterator rowResults = kuduScanner.nextRows();
+                while (rowResults.hasNext()) {
+                    RowResult row = rowResults.next();
+                    int id = row.getInt(""+keyColumn);
+                    keyList.add(id);
+
+                }
+
+            }
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        Collections.sort(keyList);

Review Comment:
   If you only need know the minimum and maximum value of the keyCloumn, you need not add all values in the list. You can find the maximum and minimum values by comparing in the while loop.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSinkState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+    private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType rowTypeInfo;
+    private KuduInputFormat kuduInputFormat;
+    private PartitionParameter partitionParameter;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return  this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+        return new KuduSourceReader(kuduInputFormat,readerContext);
+    }
+
+    @Override
+    public Serializer<KuduSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> createEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> restoreEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSinkState checkpointState) {
+        // todo:
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public Serializer<KuduSinkState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "KuduSource";
+    }
+
+    @Override
+    public void prepare(Config config) {
+
+        String kudumaster = config.getString(KuduSourceConfig.kuduMaster);
+        String tableName = config.getString(KuduSourceConfig.tableName);
+
+        String columnslist = config.getString(KuduSourceConfig.columnsList);
+
+
+        kuduInputFormat=new KuduInputFormat(kudumaster,tableName,columnslist);
+        try {
+            KuduClient.KuduClientBuilder kuduClientBuilder = new
+                    KuduClient.KuduClientBuilder(kudumaster);
+            kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+            KuduClient kuduClient = kuduClientBuilder.build();
+            partitionParameter = initPartitionParameter(kuduClient,tableName);
+            SeaTunnelRowType seaTunnelRowType =getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+
+
+            rowTypeInfo=seaTunnelRowType;
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private PartitionParameter initPartitionParameter(KuduClient kuduClient,String tableName) {
+        String keyColumn = null;
+        ArrayList<Integer> keyList = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+            ArrayList<String> columnsList = new ArrayList<String>();
+            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+            columnsList.add(""+keyColumn);
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduScanner kuduScanner = kuduScannerBuilder.build();
+            keyList = new ArrayList<>();
+            while (kuduScanner.hasMoreRows()) {
+                RowResultIterator rowResults = kuduScanner.nextRows();
+                while (rowResults.hasNext()) {
+                    RowResult row = rowResults.next();
+                    int id = row.getInt(""+keyColumn);
+                    keyList.add(id);
+
+                }
+
+            }
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        Collections.sort(keyList);
+
+        return new PartitionParameter(keyColumn, Long.parseLong(keyList.get(0).toString()), Long.parseLong(keyList.get(keyList.size()-1).toString()));
+    }
+
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+
+        } catch (Exception e) {
+            LOGGER.warn("get row type info exception", e);

Review Comment:
   If the log is only printed here, this method will return a problematic `SeaTunnelRowType`. I'm not sure whether this will cause problems.



##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A Kudu outputFormat
+ */
+public class KuduOutputFormat
+        implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+    private String kuduMaster;
+    private String kuduTableName;
+    private KuduClient kuduClient;
+    private KuduSession kuduSession;
+    private KuduTable kuduTable;
+
+
+    public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
+        this.kuduMaster = kuduSinkConfig.getKuduMaster();
+        this.kuduTableName = kuduSinkConfig.getKuduTableName();
+        init();
+    }
+
+    public void write(SeaTunnelRow element) {
+
+        Insert insert = kuduTable.newInsert();
+        Schema schema = kuduTable.getSchema();
+
+        int columnCount = schema.getColumnCount();
+        PartialRow row = insert.getRow();
+        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+            ColumnSchema col = schema.getColumnByIndex(columnIndex);
+
+            try {
+
+                switch (col.getType()) {
+                    case BOOL:
+                        row.addBoolean(columnIndex, (Boolean) element.getField(columnIndex));
+                        break;
+                    case INT8:
+                        row.addByte(columnIndex, (Byte) element.getField(columnIndex));
+                        break;
+                    case INT16:
+                        row.addShort(columnIndex, (Short) element.getField(columnIndex));
+                        break;
+                    case INT32:
+                        row.addInt(columnIndex, (Integer) element.getField(columnIndex));
+                        break;
+                    case INT64:
+                        row.addLong(columnIndex, (Long) element.getField(columnIndex));
+                        break;
+                    case UNIXTIME_MICROS:
+                        if (element.getField(columnIndex) instanceof Timestamp) {
+                            row.addTimestamp(columnIndex, (Timestamp) element.getField(columnIndex));
+                        } else {
+                            row.addLong(columnIndex, (Long) element.getField(columnIndex));
+                        }
+                        break;
+                    case FLOAT:
+                        row.addFloat(columnIndex, (Float) element.getField(columnIndex));
+                        break;
+                    case DOUBLE:
+                        row.addDouble(columnIndex, (Double) element.getField(columnIndex));
+                        break;
+                    case STRING:
+                        row.addString(columnIndex, element.getField(columnIndex).toString());
+
+                        break;
+                    case BINARY:
+                        if (element.getField(columnIndex) instanceof byte[]) {
+                            row.addBinary(columnIndex, (byte[]) element.getField(columnIndex));
+                        } else {
+                            row.addBinary(columnIndex, (ByteBuffer) element.getField(columnIndex));
+                        }
+                        break;
+                    case DECIMAL:
+                        row.addDecimal(columnIndex, (BigDecimal) element.getField(columnIndex));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported column type: " + col.getType());
+                }
+            } catch (ClassCastException e) {
+                e.printStackTrace();
+                throw new IllegalArgumentException(
+                        "Value type does not match column type " + col.getType() +
+                                " for column " + col.getName());
+            }
+
+        }
+
+        try {
+            kuduSession.apply(insert);
+
+           // kuduClient.close();
+           // kuduSession.close();
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    public void init() {
+
+
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+        this.kuduClient = kuduClientBuilder.build();
+        this.kuduSession = kuduClient.newSession();

Review Comment:
   When will the `kuduSession ` close?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 2013650523 commented on a diff in pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
2013650523 commented on code in PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#discussion_r915401010


##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSinkState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+    private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType rowTypeInfo;
+    private KuduInputFormat kuduInputFormat;
+    private PartitionParameter partitionParameter;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return  this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+        return new KuduSourceReader(kuduInputFormat,readerContext);
+    }
+
+    @Override
+    public Serializer<KuduSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> createEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> restoreEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSinkState checkpointState) {
+        // todo:
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public Serializer<KuduSinkState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "KuduSource";
+    }
+
+    @Override
+    public void prepare(Config config) {
+
+        String kudumaster = config.getString(KuduSourceConfig.kuduMaster);
+        String tableName = config.getString(KuduSourceConfig.tableName);
+
+        String columnslist = config.getString(KuduSourceConfig.columnsList);
+
+
+        kuduInputFormat=new KuduInputFormat(kudumaster,tableName,columnslist);
+        try {
+            KuduClient.KuduClientBuilder kuduClientBuilder = new
+                    KuduClient.KuduClientBuilder(kudumaster);
+            kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+            KuduClient kuduClient = kuduClientBuilder.build();
+            partitionParameter = initPartitionParameter(kuduClient,tableName);
+            SeaTunnelRowType seaTunnelRowType =getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+
+
+            rowTypeInfo=seaTunnelRowType;
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private PartitionParameter initPartitionParameter(KuduClient kuduClient,String tableName) {
+        String keyColumn = null;
+        ArrayList<Integer> keyList = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+            ArrayList<String> columnsList = new ArrayList<String>();
+            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+            columnsList.add(""+keyColumn);
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduScanner kuduScanner = kuduScannerBuilder.build();
+            keyList = new ArrayList<>();
+            while (kuduScanner.hasMoreRows()) {
+                RowResultIterator rowResults = kuduScanner.nextRows();
+                while (rowResults.hasNext()) {
+                    RowResult row = rowResults.next();
+                    int id = row.getInt(""+keyColumn);
+                    keyList.add(id);
+
+                }
+
+            }
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        Collections.sort(keyList);

Review Comment:
   ok,thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#discussion_r914335416


##########
seatunnel-examples/seatunnel-flink-new-connector-example/pom.xml:
##########
@@ -58,9 +58,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-socket</artifactId>
+            <artifactId>connector-kudu</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.47</version>
+            <scope>compile</scope>

Review Comment:
   we don't dependency mysql. just only used test or provide



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#discussion_r914335832


##########
seatunnel-connectors-v2/connector-kudu/pom.xml:
##########
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.1.1-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-kudu</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+    <dependencies>
+    <dependency>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-api</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.kudu</groupId>
+        <artifactId>kudu-client</artifactId>
+        <version>1.11.1</version>
+    </dependency>
+
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+        <groupId>commons-collections</groupId>
+        <artifactId>commons-collections</artifactId>
+        <version>3.2.2</version>
+        <scope>compile</scope>
+    </dependency>

Review Comment:
   all dependency should unified management in root pom



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#discussion_r914377864


##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSinkState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+    private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType rowTypeInfo;
+    private KuduInputFormat kuduInputFormat;
+    private PartitionParameter partitionParameter;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return  this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+        return new KuduSourceReader(kuduInputFormat,readerContext);
+    }
+
+    @Override
+    public Serializer<KuduSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> createEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSinkState> restoreEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSinkState checkpointState) {
+        // todo:
+        return new KuduSourceSplitEnumerator(enumeratorContext,partitionParameter);
+    }
+
+    @Override
+    public Serializer<KuduSinkState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "KuduSource";
+    }
+
+    @Override
+    public void prepare(Config config) {
+
+        String kudumaster = config.getString(KuduSourceConfig.kuduMaster);
+        String tableName = config.getString(KuduSourceConfig.tableName);
+
+        String columnslist = config.getString(KuduSourceConfig.columnsList);
+
+
+        kuduInputFormat=new KuduInputFormat(kudumaster,tableName,columnslist);
+        try {
+            KuduClient.KuduClientBuilder kuduClientBuilder = new
+                    KuduClient.KuduClientBuilder(kudumaster);
+            kuduClientBuilder.defaultOperationTimeoutMs(1800000);
+
+            KuduClient kuduClient = kuduClientBuilder.build();
+            partitionParameter = initPartitionParameter(kuduClient,tableName);
+            SeaTunnelRowType seaTunnelRowType =getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+
+
+            rowTypeInfo=seaTunnelRowType;
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private PartitionParameter initPartitionParameter(KuduClient kuduClient,String tableName) {
+        String keyColumn = null;
+        ArrayList<Integer> keyList = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+            ArrayList<String> columnsList = new ArrayList<String>();
+            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+            columnsList.add(""+keyColumn);
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduScanner kuduScanner = kuduScannerBuilder.build();
+            keyList = new ArrayList<>();
+            while (kuduScanner.hasMoreRows()) {
+                RowResultIterator rowResults = kuduScanner.nextRows();
+                while (rowResults.hasNext()) {
+                    RowResult row = rowResults.next();
+                    int id = row.getInt(""+keyColumn);
+                    keyList.add(id);
+
+                }
+
+            }
+        } catch (KuduException e) {
+            e.printStackTrace();
+        }
+        Collections.sort(keyList);

Review Comment:
   If you only need know the minimum and maximum value of the keyCloumn, You don't need to add all the data to the list. You can find the maximum and minimum values by comparing in the while loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2137: [api draft] [connector] Add Kudu source and sink connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2137:
URL: https://github.com/apache/incubator-seatunnel/pull/2137#issuecomment-1185403669

   We changed the binary release rule. The binary release will not contain connectors-v2, So the connectors v2 no longer need add the dependency jar to `tools/dependencies/known-dependencies.txt` and `LICENSE` file.
   
   More information can be found here : https://github.com/apache/incubator-seatunnel/pull/2162.
   
   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org