You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/12 13:29:42 UTC

[GitHub] [flink] twalthr commented on a change in pull request #17459: [FLINK-24397] Remove TableSchema usages from Flink connectors

twalthr commented on a change in pull request #17459:
URL: https://github.com/apache/flink/pull/17459#discussion_r727125318



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
##########
@@ -57,8 +58,8 @@
      * type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key
      * constraint must be defined on the single row key field.
      */
-    public static void validatePrimaryKey(TableSchema schema) {
-        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
+    public static void validatePrimaryKey(DataType dataType, Schema schema) {

Review comment:
       Why is `Schema` necessary here?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
##########
@@ -80,14 +80,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
         helper.validate();
         validateConfigOptions(config);
         JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
-        TableSchema physicalSchema =
-                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 
         return new JdbcDynamicTableSink(
                 jdbcOptions,
                 getJdbcExecutionOptions(config),
-                getJdbcDmlOptions(jdbcOptions, physicalSchema),
-                physicalSchema);
+                getJdbcDmlOptions(
+                        jdbcOptions,
+                        context.getCatalogTable().getUnresolvedSchema(),

Review comment:
       can we remove passing this as well?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -225,7 +225,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
                 context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
 
         final DataType physicalDataType =
-                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

Review comment:
       use the context method and don't go over `getCatalogTable()` anymore, as a reference implementation

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -311,7 +311,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
 
     private static void validatePKConstraints(
             ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
-        if (catalogTable.getSchema().getPrimaryKey().isPresent()
+        if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()

Review comment:
       validate the context index array instead

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java
##########
@@ -77,21 +77,22 @@ private void validatePrimaryKey(ChangelogMode requestedMode) {
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         final TypeInformation<RowData> rowDataTypeInformation =
-                context.createTypeInformation(tableSchema.toRowDataType());
+                context.createTypeInformation(physicalRowDataType);
         final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder();
 
         builder.setJdbcOptions(jdbcOptions);
         builder.setJdbcDmlOptions(dmlOptions);
         builder.setJdbcExecutionOptions(executionOptions);
         builder.setRowDataTypeInfo(rowDataTypeInformation);
-        builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
+        builder.setFieldDataTypes(physicalRowDataType.getChildren().toArray(new DataType[0]));

Review comment:
       nit: use `DataTypes.getFieldDataTypes` instead, just to be sure. `getChildren` would return something if the type is array or map.

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
##########
@@ -101,50 +101,46 @@
                 createTestItem("postgresql", "ARRAY<INTEGER>"),
 
                 // Unsupported types throws errors.
-                createTestItem(
-                        "derby", "BINARY", "The Derby dialect doesn't support type: BINARY(1)."),
                 createTestItem(
                         "derby",
-                        "VARBINARY(10)",
-                        "The Derby dialect doesn't support type: VARBINARY(10)."),
+                        "BINARY",
+                        "Unsupported conversion from data type 'BINARY(1)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion."),

Review comment:
       this exception seems wrong. we should not use a conversion from data type to legacy type information anymore

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
##########
@@ -154,16 +153,17 @@ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
         return builder.build();
     }
 
-    private JdbcDmlOptions getJdbcDmlOptions(JdbcConnectorOptions jdbcOptions, TableSchema schema) {
+    private JdbcDmlOptions getJdbcDmlOptions(
+            JdbcConnectorOptions jdbcOptions, Schema schema, DataType dataType) {

Review comment:
       we can use the primary key indexes instead `Schema`

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunction.java
##########
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc.table;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
-import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
-import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
-import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
-import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
-import org.apache.flink.connector.jdbc.utils.JdbcUtils;
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
-
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getFieldFromResultSet;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TableFunction} to query fields from JDBC by keys. The query template like:
- *
- * <PRE>
- * SELECT c, d, e, f from T where a = ? and b = ?
- * </PRE>
- *
- * <p>Support cache the result to avoid frequent accessing to remote databases. 1.The cacheMaxSize
- * is -1 means not use cache. 2.For real-time data, you need to set the TTL of cache.
- */
-public class JdbcLookupFunction extends TableFunction<Row> {

Review comment:
       can we drop this? is there a similar `RowData` one?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -74,16 +73,16 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
             int[] innerKeyArr = context.getKeys()[i];
             Preconditions.checkArgument(
                     innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
-            keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
+            keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
         }
-        final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
+        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
 
         return TableFunctionProvider.of(
                 new JdbcRowDataLookupFunction(
                         options,
                         lookupOptions,
-                        physicalSchema.getFieldNames(),
-                        physicalSchema.getFieldDataTypes(),
+                        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                        physicalRowDataType.getChildren().toArray(new DataType[0]),

Review comment:
       same comment as above: use `getFieldDataTypes`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org