You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2022/06/13 07:51:06 UTC

[GitHub] [bahir-flink] eskabetxe commented on a diff in pull request #149: [BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction

eskabetxe commented on code in PR #149:
URL: https://github.com/apache/bahir-flink/pull/149#discussion_r895405774


##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.kudu.connector.convertor;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.RowResult;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+
+/**
+ * Transform the Kudu RowResult object into a Flink RowData object
+ */
+public class RowResultRowDataConvertor implements RowResultConvertor<RowData> {
+    @Override
+    public RowData convertor(RowResult row) {
+        Schema schema = row.getColumnProjection();
+        GenericRowData values = new GenericRowData(schema.getColumnCount());
+        schema.getColumns().forEach(column -> {
+            String name = column.getName();
+            Type type = column.getType();
+            int pos = schema.getColumnIndex(name);
+            if (Objects.isNull(type)) {
+                throw new IllegalArgumentException("columnName:" + name);
+            }
+            if (row.isNull(name)){
+                return;
+            }
+            switch (type) {
+                case DECIMAL:
+                    BigDecimal decimal = row.getDecimal(name);
+                    values.setField(pos, DecimalData.fromBigDecimal(decimal, decimal.precision(), decimal.scale()));
+                    break;
+                case UNIXTIME_MICROS:
+                    values.setField(pos, TimestampData.fromTimestamp(row.getTimestamp(name)));
+                    break;
+                case DOUBLE:
+                    values.setField(pos, row.getDouble(name));
+                    break;
+                case STRING:
+                    Object value = row.getObject(name);
+                    values.setField(pos, StringData.fromString(Objects.nonNull(value) ? value.toString() : ""));
+                    break;
+                case BINARY:
+                    values.setField(pos, row.getBinary(name));
+                    break;
+                case FLOAT:
+                    values.setField(pos, row.getFloat(name));
+                    break;
+                case INT64:
+                    values.setField(pos, row.getLong(name));
+                    break;
+                case INT32:
+                case INT16:
+                case INT8:
+                    values.setField(pos, row.getInt(name));
+                    break;
+                case BOOL:
+                    values.setField(pos, row.getBoolean(name));
+                    break;
+                default:
+                    throw new IllegalArgumentException("columnName:" + name + ",type:" + type.getName() + "不支持!");

Review Comment:
   can you translate to english the message?



##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/RowDataUpsertOperationMapper.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
+@Internal
+public class RowDataUpsertOperationMapper extends AbstractSingleOperationMapper<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RowDataUpsertOperationMapper.class);
+
+
+    private static final int MIN_TIME_PRECISION = 0;
+    private static final int MAX_TIME_PRECISION = 3;
+    private static final int MIN_TIMESTAMP_PRECISION = 0;
+    private static final int MAX_TIMESTAMP_PRECISION = 6;
+
+    private LogicalType[] logicalTypes;
+
+    public RowDataUpsertOperationMapper(TableSchema schema) {
+        super(schema.getFieldNames());
+        logicalTypes = Arrays.stream(schema.getFieldDataTypes())
+                .map(DataType::getLogicalType)
+                .toArray(LogicalType[]::new);
+    }
+
+    @Override
+    public Object getField(RowData input, int i) {
+        return getFieldValue(input, i);
+    }
+
+    public Object getFieldValue(RowData input, int i) {
+        if (input == null || input.isNullAt(i)) {
+            return null;
+        }
+        LogicalType fieldType = logicalTypes[i];
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR: {
+                StringData data = input.getString(i);
+                if (data != null) {
+                    return data.toString();
+                }
+                return null;
+            }
+            case BOOLEAN:
+                return input.getBoolean(i);
+            case BINARY:
+            case VARBINARY:
+                return input.getBinary(i);
+            case DECIMAL: {
+                DecimalType decimalType = (DecimalType) fieldType;
+                final int precision = decimalType.getPrecision();
+                final int scale = decimalType.getScale();
+                DecimalData data = input.getDecimal(i, precision, scale);
+                if (data != null) {
+                    return data.toBigDecimal();
+                } else {
+                    return null;
+                }
+            }
+            case TINYINT:
+                return input.getByte(i);
+            case SMALLINT:
+                return input.getShort(i);
+            case INTEGER:
+            case DATE:
+            case INTERVAL_YEAR_MONTH:
+                return input.getInt(i);
+            case TIME_WITHOUT_TIME_ZONE:
+                final int timePrecision = getPrecision(fieldType);
+                if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
+                    throw new UnsupportedOperationException(
+                            String.format("The precision %s of TIME type is out of the range [%s, %s] supported by " +
+                                    "HBase connector", timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));

Review Comment:
   HBase?



##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Factory for creating configured instances of {@link KuduDynamicTableSource}/{@link KuduDynamicTableSink} in
+ * a stream environment.
+ */
+public class KuduDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+    public static final String IDENTIFIER = "kudu";
+    public static final ConfigOption<String> KUDU_TABLE = ConfigOptions
+            .key("kudu.table")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("kudu's table name");
+
+    public static final ConfigOption<String> KUDU_MASTERS =
+            ConfigOptions
+                    .key("kudu.masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's master server address");
+
+
+    public static final ConfigOption<String> KUDU_HASH_COLS =
+            ConfigOptions
+                    .key("kudu.hash-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's hash columns");
+
+    public static final ConfigOption<Integer> KUDU_REPLICAS =
+            ConfigOptions
+                    .key("kudu.replicas")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("kudu's replica nums");
+
+    public static final ConfigOption<Integer> KUDU_MAX_BUFFER_SIZE =
+            ConfigOptions
+                    .key("kudu.max-buffer-size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's max buffer size");
+
+    public static final ConfigOption<Integer> KUDU_FLUSH_INTERVAL =
+            ConfigOptions
+                    .key("kudu.flush-interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's data flush interval");
+
+    public static final ConfigOption<Long> KUDU_OPERATION_TIMEOUT =
+            ConfigOptions
+                    .key("kudu.operation-timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("kudu's operation timeout");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_NOT_FOUND =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all not found rows");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_DUPLICATE =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all dulicate rows");
+
+    /**
+     * hash partition bucket nums
+     */
+    public static final ConfigOption<Integer> KUDU_HASH_PARTITION_NUMS =
+            ConfigOptions
+                    .key("kudu.hash-partition-nums")
+                    .intType()
+                    .defaultValue(KUDU_REPLICAS.defaultValue() * 2)
+                    .withDescription("kudu's hash partition bucket nums,defaultValue is 2 * replica nums");
+
+    public static final ConfigOption<String> KUDU_PRIMARY_KEY_COLS =
+            ConfigOptions
+                    .key("kudu.primary-key-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's primary key,primary key must be ordered");
+
+
+    public static final ConfigOption<Integer> KUDU_SCAN_ROW_SIZE =
+            ConfigOptions
+                    .key("kudu.scan.row-size")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription("kudu's scan row size");
+
+    /**
+     * lookup cache config
+     */
+    public static final ConfigOption<Long> KUDU_LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions
+                    .key("kudu.lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +
+                            "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any" +
+                            " of them is " +
+                            "specified. Cache is not enabled as default.");
+
+    public static final ConfigOption<Long> KUDU_LOOKUP_CACHE_TTL =
+            ConfigOptions
+                    .key("kudu.lookup.cache.ttl")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("the cache time to live.");
+
+    public static final ConfigOption<Integer> KUDU_LOOKUP_MAX_RETRIES =
+            ConfigOptions
+                    .key("kudu.lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("the max retry times if lookup database failed.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ReadableConfig config = getReadableConfig(context);
+        String masterAddresses = config.get(KUDU_MASTERS);
+        String tableName = config.get(KUDU_TABLE);
+        Optional<Long> operationTimeout = config.getOptional(KUDU_OPERATION_TIMEOUT);
+        Optional<Integer> flushInterval = config.getOptional(KUDU_FLUSH_INTERVAL);
+        Optional<Integer> bufferSize = config.getOptional(KUDU_MAX_BUFFER_SIZE);
+        Optional<Boolean> ignoreNotFound = config.getOptional(KUDU_IGNORE_NOT_FOUND);
+        Optional<Boolean> ignoreDuplicate = config.getOptional(KUDU_IGNORE_DUPLICATE);
+        TableSchema schema = context.getCatalogTable().getSchema();
+        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
+        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema,
+                context.getCatalogTable().toProperties());
+
+        KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
+                .setMasters(masterAddresses);
+        operationTimeout.ifPresent(configBuilder::setOperationTimeout);
+        flushInterval.ifPresent(configBuilder::setFlushInterval);
+        bufferSize.ifPresent(configBuilder::setMaxBufferSize);
+        ignoreNotFound.ifPresent(configBuilder::setIgnoreNotFound);
+        ignoreDuplicate.ifPresent(configBuilder::setIgnoreDuplicate);
+        return new KuduDynamicTableSink(configBuilder, physicalSchema, tableInfo);
+    }
+
+    /**
+     * get readableConfig
+     *
+     * @param context
+     * @return {@link ReadableConfig}
+     */
+    private ReadableConfig getReadableConfig(Context context) {
+        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        return helper.getOptions();
+    }
+
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ReadableConfig config = getReadableConfig(context);
+        String masterAddresses = config.get(KUDU_MASTERS);
+
+        int scanRowSize = config.get(KUDU_SCAN_ROW_SIZE);
+        long kuduCacheMaxRows = config.get(KUDU_LOOKUP_CACHE_MAX_ROWS);
+        long kuduCacheTtl = config.get(KUDU_LOOKUP_CACHE_TTL);
+        int kuduMaxReties = config.get(KUDU_LOOKUP_MAX_RETRIES);
+
+        // 构造kudu lookup options

Review Comment:
   can you translate?



##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Factory for creating configured instances of {@link KuduDynamicTableSource}/{@link KuduDynamicTableSink} in
+ * a stream environment.
+ */
+public class KuduDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+    public static final String IDENTIFIER = "kudu";
+    public static final ConfigOption<String> KUDU_TABLE = ConfigOptions
+            .key("kudu.table")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("kudu's table name");
+
+    public static final ConfigOption<String> KUDU_MASTERS =
+            ConfigOptions
+                    .key("kudu.masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's master server address");
+
+
+    public static final ConfigOption<String> KUDU_HASH_COLS =
+            ConfigOptions
+                    .key("kudu.hash-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's hash columns");
+
+    public static final ConfigOption<Integer> KUDU_REPLICAS =
+            ConfigOptions
+                    .key("kudu.replicas")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("kudu's replica nums");
+
+    public static final ConfigOption<Integer> KUDU_MAX_BUFFER_SIZE =
+            ConfigOptions
+                    .key("kudu.max-buffer-size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's max buffer size");
+
+    public static final ConfigOption<Integer> KUDU_FLUSH_INTERVAL =
+            ConfigOptions
+                    .key("kudu.flush-interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's data flush interval");
+
+    public static final ConfigOption<Long> KUDU_OPERATION_TIMEOUT =
+            ConfigOptions
+                    .key("kudu.operation-timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("kudu's operation timeout");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_NOT_FOUND =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all not found rows");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_DUPLICATE =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all dulicate rows");
+
+    /**
+     * hash partition bucket nums
+     */
+    public static final ConfigOption<Integer> KUDU_HASH_PARTITION_NUMS =
+            ConfigOptions
+                    .key("kudu.hash-partition-nums")
+                    .intType()
+                    .defaultValue(KUDU_REPLICAS.defaultValue() * 2)
+                    .withDescription("kudu's hash partition bucket nums,defaultValue is 2 * replica nums");

Review Comment:
   space after comma ','



##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/AbstractKuduInputFormat.java:
##########
@@ -42,39 +43,42 @@
 
 /**
  * Input format for reading the contents of a Kudu table (defined by the provided {@link KuduTableInfo}) in both batch
- * and stream programs. Rows of the Kudu table are mapped to {@link Row} instances that can converted to other data
+ * and stream programs. Rows of the Kudu table are mapped to {@link T} instances that can converted to other data
  * types by the user later if necessary.
  *
- * <p> For programmatic access to the schema of the input rows users can use the {@link org.apache.flink.connectors.kudu.table.KuduCatalog}
+ * <p> For programmatic access to the schema of the input rows users can use the {@link KuduCatalog}
  * or overwrite the column order manually by providing a list of projected column names.
  */
 @PublicEvolving
-public class KuduRowInputFormat extends RichInputFormat<Row, KuduInputSplit> {
+public abstract class AbstractKuduInputFormat<T> extends RichInputFormat<T, KuduInputSplit> implements ResultTypeQueryable<T> {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final KuduReaderConfig readerConfig;
     private final KuduTableInfo tableInfo;
-
-    private List<KuduFilterInfo> tableFilters;
-    private List<String> tableProjections;
-
+    private final List<KuduFilterInfo> tableFilters;
+    private final List<String> tableProjections;
+    private final RowResultConvertor<T> rowResultConvertor;
     private boolean endReached;
+    private transient KuduReader<T> kuduReader;
+    private transient KuduReaderIterator<T> resultIterator;
 
-    private transient KuduReader kuduReader;
-    private transient KuduReaderIterator resultIterator;
-
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo) {
-        this(readerConfig, tableInfo, new ArrayList<>(), null);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), null);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<String> tableProjections) {
-        this(readerConfig, tableInfo, new ArrayList<>(), tableProjections);
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, List<String> tableProjections) {
+        this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), tableProjections);
     }
 
-    public KuduRowInputFormat(KuduReaderConfig readerConfig, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
+    public AbstractKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
+                                   KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters,
+                                   List<String> tableProjections) {
 
         this.readerConfig = checkNotNull(readerConfig, "readerConfig could not be null");
+        this.rowResultConvertor = checkNotNull(rowResultConvertor, "readerConfig could not be null");

Review Comment:
   in message "readerConfig" should be rowResultConvertor 



##########
flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table.dynamic;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.connectors.kudu.table.function.lookup.KuduLookupOptions;
+import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.kudu.shaded.com.google.common.collect.Sets;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Factory for creating configured instances of {@link KuduDynamicTableSource}/{@link KuduDynamicTableSink} in
+ * a stream environment.
+ */
+public class KuduDynamicTableSourceSinkFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+    public static final String IDENTIFIER = "kudu";
+    public static final ConfigOption<String> KUDU_TABLE = ConfigOptions
+            .key("kudu.table")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("kudu's table name");
+
+    public static final ConfigOption<String> KUDU_MASTERS =
+            ConfigOptions
+                    .key("kudu.masters")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's master server address");
+
+
+    public static final ConfigOption<String> KUDU_HASH_COLS =
+            ConfigOptions
+                    .key("kudu.hash-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's hash columns");
+
+    public static final ConfigOption<Integer> KUDU_REPLICAS =
+            ConfigOptions
+                    .key("kudu.replicas")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("kudu's replica nums");
+
+    public static final ConfigOption<Integer> KUDU_MAX_BUFFER_SIZE =
+            ConfigOptions
+                    .key("kudu.max-buffer-size")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's max buffer size");
+
+    public static final ConfigOption<Integer> KUDU_FLUSH_INTERVAL =
+            ConfigOptions
+                    .key("kudu.flush-interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("kudu's data flush interval");
+
+    public static final ConfigOption<Long> KUDU_OPERATION_TIMEOUT =
+            ConfigOptions
+                    .key("kudu.operation-timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("kudu's operation timeout");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_NOT_FOUND =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all not found rows");
+
+    public static final ConfigOption<Boolean> KUDU_IGNORE_DUPLICATE =
+            ConfigOptions
+                    .key("kudu.ignore-not-found")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, ignore all dulicate rows");
+
+    /**
+     * hash partition bucket nums
+     */
+    public static final ConfigOption<Integer> KUDU_HASH_PARTITION_NUMS =
+            ConfigOptions
+                    .key("kudu.hash-partition-nums")
+                    .intType()
+                    .defaultValue(KUDU_REPLICAS.defaultValue() * 2)
+                    .withDescription("kudu's hash partition bucket nums,defaultValue is 2 * replica nums");
+
+    public static final ConfigOption<String> KUDU_PRIMARY_KEY_COLS =
+            ConfigOptions
+                    .key("kudu.primary-key-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("kudu's primary key,primary key must be ordered");

Review Comment:
   space after comma ','



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@bahir.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org