You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/15 09:59:31 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

tsreaper opened a new pull request, #158:
URL: https://github.com/apache/flink-table-store/pull/158

   As table store schema is supported, we should use this schema as the ground truth. Hive DDL should only be used for checking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #158:
URL: https://github.com/apache/flink-table-store/pull/158


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898751335


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java:
##########
@@ -45,23 +37,13 @@
 public class TableStoreJobConf {
 
     private static final String TBLPROPERTIES_PREFIX = "table-store.";
-    private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";
     private static final String INTERNAL_TBLPROPERTIES_PREFIX =
             "table-store.internal.tblproperties.";
 
     private static final String INTERNAL_DB_NAME = "table-store.internal.db-name";

Review Comment:
   getDbName is never used



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java:
##########
@@ -45,23 +37,13 @@
 public class TableStoreJobConf {
 
     private static final String TBLPROPERTIES_PREFIX = "table-store.";
-    private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";
     private static final String INTERNAL_TBLPROPERTIES_PREFIX =
             "table-store.internal.tblproperties.";
 
     private static final String INTERNAL_DB_NAME = "table-store.internal.db-name";
     private static final String INTERNAL_TABLE_NAME = "table-store.internal.table-name";

Review Comment:
   getTableName is never used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898626041


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -18,50 +18,67 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
-    private final List<String> fieldNames;
-    private final List<TypeInfo> fieldTypeInfos;
+    private static final String TBLPROPERTIES_PREFIX = "table-store.";
+    private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";

Review Comment:
   We need to check that the primary keys in Hive DDL match the primary keys in schema. As Hive does not support primary keys we have to implement this in table properties.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898744565


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -64,20 +67,104 @@ public List<String> fieldComments() {
 
     /** Extract {@link HiveSchema} from Hive serde properties. */
     public static HiveSchema extract(Properties properties) {
-        String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
-        String columnNameDelimiter =
-                properties.getProperty(
-                        serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
-        List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+        String location = properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+        if (location == null) {
+            String tableName = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+            throw new UnsupportedOperationException(
+                    "Location property is missing for table "
+                            + tableName
+                            + ". Currently Flink table store only supports external table for Hive "
+                            + "so location property must be set.");
+        }
+        Schema schema =
+                new SchemaManager(new Path(location))
+                        .latest()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Schema file not found in location "
+                                                        + location
+                                                        + ". Please create table first."));
 
-        String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
-        List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+        if (properties.containsKey(serdeConstants.LIST_COLUMNS)
+                && properties.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
+            String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
+            String columnNameDelimiter =
+                    properties.getProperty(
+                            serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
+            List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+
+            String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+            List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+
+            if (names.size() > 0 && typeInfos.size() > 0) {
+                checkSchemaMatched(names, typeInfos, schema);
+            }
+        }
 
         // see MetastoreUtils#addCols for the exact property name and separator
         String columnCommentsPropertyName = "columns.comments";
         List<String> comments =
-                Arrays.asList(properties.getProperty(columnCommentsPropertyName).split("\0", -1));
+                new ArrayList<>(
+                        Arrays.asList(
+                                properties.getProperty(columnCommentsPropertyName).split("\0")));
+        while (comments.size() < schema.fields().size()) {
+            comments.add("");
+        }
+
+        return new HiveSchema(schema, comments);

Review Comment:
   comments can be extract from schema. `description` in `DataField`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898743575


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java:
##########
@@ -87,39 +77,15 @@ public static void configureInputJobProperties(Properties properties, Map<String
                 INTERNAL_LOCATION,
                 properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
 
-        map.put(
-                INTERNAL_COLUMN_NAMES,
-                properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS));
-
+        HiveSchema hiveSchema = HiveSchema.extract(properties);
+        map.put(INTERNAL_COLUMN_NAMES, String.join(INTERNAL_SEPARATOR, hiveSchema.fieldNames()));
         List<String> serializedLogicalTypes =
-                TypeInfoUtils.getTypeInfosFromTypeString(
-                                properties.getProperty(
-                                        hive_metastoreConstants.META_TABLE_COLUMN_TYPES))
-                        .stream()
-                        .map(
-                                t ->
-                                        HiveTypeUtils.typeInfoToDataType(t)
-                                                .getLogicalType()
-                                                .asSerializableString())
+                hiveSchema.fieldTypes().stream()
+                        .map(LogicalType::asSerializableString)
                         .collect(Collectors.toList());
-        map.put(INTERNAL_COLUMN_TYPES, String.join(COLUMN_TYPES_SEPARATOR, serializedLogicalTypes));
-
-        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {
-            map.put(
-                    INTERNAL_PARTITION_COLUMN_NAMES,
-                    properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS));
-        }
-
-        if (properties.containsKey(TBLPROPERTIES_PRIMARY_KEYS)) {
-            map.put(INTERNAL_PRIMARY_KEYS, properties.getProperty(TBLPROPERTIES_PRIMARY_KEYS));
-        }
-
-        for (ConfigOption<?> option : FileStoreOptions.allOptions()) {
-            if (properties.containsKey(TBLPROPERTIES_PREFIX + option.key())) {
-                map.put(
-                        INTERNAL_TBLPROPERTIES_PREFIX + option.key(),
-                        properties.getProperty(TBLPROPERTIES_PREFIX + option.key()));
-            }
+        map.put(INTERNAL_COLUMN_TYPES, String.join(INTERNAL_SEPARATOR, serializedLogicalTypes));

Review Comment:
   Any update? Just `map.put("schema", schema.toJson)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898743212


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -73,11 +90,134 @@ public static HiveSchema extract(Properties properties) {
         String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
         List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
 
+        List<String> partitionKeys = new ArrayList<>();
+        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {

Review Comment:
   I think the better way is: define any columns is not allowed in hive external table.
   Otherwise, the two sides are not aligned and there is a lot of behavior that needs to be explained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r897856853


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -18,50 +18,67 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
-    private final List<String> fieldNames;
-    private final List<TypeInfo> fieldTypeInfos;
+    private static final String TBLPROPERTIES_PREFIX = "table-store.";
+    private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";

Review Comment:
   `TBLPROPERTIES_PRIMARY_KEYS` is for what? Nobody read it.



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java:
##########
@@ -87,39 +77,15 @@ public static void configureInputJobProperties(Properties properties, Map<String
                 INTERNAL_LOCATION,
                 properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
 
-        map.put(
-                INTERNAL_COLUMN_NAMES,
-                properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS));
-
+        HiveSchema hiveSchema = HiveSchema.extract(properties);
+        map.put(INTERNAL_COLUMN_NAMES, String.join(INTERNAL_SEPARATOR, hiveSchema.fieldNames()));
         List<String> serializedLogicalTypes =
-                TypeInfoUtils.getTypeInfosFromTypeString(
-                                properties.getProperty(
-                                        hive_metastoreConstants.META_TABLE_COLUMN_TYPES))
-                        .stream()
-                        .map(
-                                t ->
-                                        HiveTypeUtils.typeInfoToDataType(t)
-                                                .getLogicalType()
-                                                .asSerializableString())
+                hiveSchema.fieldTypes().stream()
+                        .map(LogicalType::asSerializableString)
                         .collect(Collectors.toList());
-        map.put(INTERNAL_COLUMN_TYPES, String.join(COLUMN_TYPES_SEPARATOR, serializedLogicalTypes));
-
-        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {
-            map.put(
-                    INTERNAL_PARTITION_COLUMN_NAMES,
-                    properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS));
-        }
-
-        if (properties.containsKey(TBLPROPERTIES_PRIMARY_KEYS)) {
-            map.put(INTERNAL_PRIMARY_KEYS, properties.getProperty(TBLPROPERTIES_PRIMARY_KEYS));
-        }
-
-        for (ConfigOption<?> option : FileStoreOptions.allOptions()) {
-            if (properties.containsKey(TBLPROPERTIES_PREFIX + option.key())) {
-                map.put(
-                        INTERNAL_TBLPROPERTIES_PREFIX + option.key(),
-                        properties.getProperty(TBLPROPERTIES_PREFIX + option.key()));
-            }
+        map.put(INTERNAL_COLUMN_TYPES, String.join(INTERNAL_SEPARATOR, serializedLogicalTypes));

Review Comment:
   Can we just use a schema json? 



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -73,11 +90,134 @@ public static HiveSchema extract(Properties properties) {
         String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
         List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
 
+        List<String> partitionKeys = new ArrayList<>();
+        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {

Review Comment:
   I am thinking why we need to check it.
   Can we just assert that there is no partition and column definition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #158: [FLINK-28079] Check Hive DDL against table store schema when creating table

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898626398


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -73,11 +90,134 @@ public static HiveSchema extract(Properties properties) {
         String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
         List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
 
+        List<String> partitionKeys = new ArrayList<>();
+        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {

Review Comment:
   Why not? We can support partitions now that we have the real schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org