You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangjun0x01 (via GitHub)" <gi...@apache.org> on 2023/03/01 03:30:32 UTC

[GitHub] [flink-table-store] zhangjun0x01 opened a new pull request, #547: [FLINK-31128] Add Create Table As for flink table store

zhangjun0x01 opened a new pull request, #547:
URL: https://github.com/apache/flink-table-store/pull/547

   Add Create Table As for flink table store


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933201


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -123,6 +123,40 @@ public TableSchema createTable(Schema schema) throws Exception {
             Map<String, String> options = schema.options();
             int highestFieldId = RowType.currentHighestFieldId(fields);
 
+            List<String> columnNames =
+                    schema.fields().stream().map(DataField::name).collect(Collectors.toList());
+            if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+                if (!primaryKeys.isEmpty()) {

Review Comment:
   The validation should be added into `SchemaValidation#validateTableSchema`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112563412


##########
flink-table-store-connector/pom.xml:
##########
@@ -151,6 +151,27 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>

Review Comment:
   Could the scope set to `test`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111671280


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,

Review Comment:
   yes。we should  offer the difference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111669015


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,
+// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit.

Review Comment:
   For spark,it has not problems ,after the operation, we can get the correct result ,and read the correct the data from the target table, but for flink. the default mode is streaming mode ,  we need to enable the checkpoint , but if the user forgot to enable the checkpoint , we will can not get data from the target table , user will be confused , why the same operation, it is fine for spark ,but it is not fine for flink ,so I add the explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112586512


##########
flink-table-store-connector/pom.xml:
##########
@@ -151,6 +151,27 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>

Review Comment:
   I also set the scope to `test` at first, but I found that it still throws an exception, so I update it to `provided`  .  I try `test` scope again , 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112587410


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,79 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
+The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement:
+`CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;`
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// For streaming mode, you need to enable the checkpoint.

Review Comment:
   I update it to /* xxxxx */, I found it is fine on my computer, what about you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1439381803

   > Thanks @zhangjun0x01 , at present, the biggest problem with CTAS is that it cannot specify the primary key and partition column, which makes it almost unavailable for production. Or can we consider supporting them?
   
   ok。I will add the functions later .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #547:
URL: https://github.com/apache/flink-table-store/pull/547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112631890


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,79 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
+The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement:
+`CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;`
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// For streaming mode, you need to enable the checkpoint.

Review Comment:
   LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112047870


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,
+// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit.

Review Comment:
   @SteNicholas thanks for your suggestion , I update it follow your comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111378087


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,

Review Comment:
   Please explain the case to use `CREATE TABLE AS` and provide the difference between the usage of this and `CREATE TABLE LIKE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 closed pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 closed pull request #547: [FLINK-31128] Add Create Table As for flink table store
URL: https://github.com/apache/flink-table-store/pull/547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933738


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,126 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
+The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement:

Review Comment:
   Adds the NOTE to use `primary-key` and `partition`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1112563022


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,79 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
+The resulting table `table_b` will be equivalent to create the table and insert the data with the following statement:
+`CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;`
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// For streaming mode, you need to enable the checkpoint.

Review Comment:
   Could this use NOTE to explain? The `//` looks no good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933424


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,20 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Define primary key by table options, cannot define primary key on DDL and table options at the same time.");

Review Comment:
   Which case could the user define primary key on DDL and table options at the same time? IMO, you need to explain the case clearly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111831443


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,
+// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit.

Review Comment:
   @zhangjun0x01, because the explanation is in the `Flink` tab, you could explain that enable the checkpoint for streaming mode, no need to explain the job behavior. BTW, you could add the `Spark3` tab for `Create Table As`, because you have added the test case of `Create Table As` in `SparkReadITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1118773693


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -123,6 +123,40 @@ public TableSchema createTable(Schema schema) throws Exception {
             Map<String, String> options = schema.options();
             int highestFieldId = RowType.currentHighestFieldId(fields);
 
+            List<String> columnNames =
+                    schema.fields().stream().map(DataField::name).collect(Collectors.toList());
+            if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+                if (!primaryKeys.isEmpty()) {

Review Comment:
   I tried to move the validation to `SchemaValidation#validateTableSchema`, but I found that the main function of the method is to validate , but we need to modify the values of partition and primary-key, so I think `SchemaManager#createTable` is more appropriate, what do you think about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1118244978


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,20 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Define primary key by table options, cannot define primary key on DDL and table options at the same time.");

Review Comment:
   For Spark, the syntax for specifying partitions and primary keys is the same in `crate table` and `create table as select`.
   
   But for Flink, `create table as select` cannot specify the partition and primary key . So we use the option parameter to specify it. However, if the user writes a following sql 
   ```
   CREATE TABLE MyTable (
       user_id BIGINT,
       item_id BIGINT,
       behavior STRING,
       dt STRING,
       hh STRING,
       PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
   ) WITH ('primary-key' = 'dt' );
   
   ```
   we cannot determine which primary key to use. Therefore, we add verification to prevent the primary key and partition from appearing at the same time in ddl and option.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1449196021

   > It seems that some comments are not solved. @zhangjun0x01
   
   hi,@JingsongLi ,I updated the comment,but I run the failed UT `org.apache.flink.table.store.file.operation.FileStoreCommitTest` in my computer, it is fine,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1439919706

   > > Thanks @zhangjun0x01 , at present, the biggest problem with CTAS is that it cannot specify the primary key and partition column, which makes it almost unavailable for production. Or can we consider supporting them?
   > 
   > @zhangjun0x01 The core problem is that Flink SQL CTAS dose not support pk partition key declaration, Maybe we can provide options for primary key and partition key just like Spark Create table for table store. What do you think?
   
   I test it in hive , it also do not support the function , I didn't find a reference . One way to implement this function is to add options as you said. The other is to modify the `SqlCreateTableAs` parse syntax and add primary key and partition support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1116731188


##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
             catalogTable = catalogTable.copy(options);
         }
 
+        if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {

Review Comment:
   These logical can be in `SchemaManager.createTable`? Spark dose not have these codes to take care partition.



##########
flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java:
##########
@@ -131,6 +131,137 @@ public void testCreateTable() {
                                 + "{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]");
     }
 
+    @Test
+    public void testCreateTableAs() {
+        spark.sql(
+                "CREATE TABLE default.testCreateTable(\n"
+                        + "a BIGINT,\n"
+                        + "b VARCHAR(10),\n"
+                        + "c CHAR(10))");
+        spark.sql("INSERT INTO default.testCreateTable VALUES(1,'a','b')");
+        spark.sql(
+                "CREATE TABLE default.testCreateTableAs AS SELECT * FROM default.testCreateTable");
+        List<Row> result = spark.sql("SELECT * FROM default.testCreateTableAs").collectAsList();
+        assertThat(result.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
+
+        // partitioned table
+        spark.sql(
+                "CREATE TABLE default.partitionedTable (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING,"
+                        + "c STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a,b)");
+        spark.sql("INSERT INTO default.partitionedTable VALUES(1,'aaa','bbb')");
+        spark.sql(
+                "CREATE TABLE default.partitionedTableAs TBLPROPERTIES ('partition' = 'a') AS SELECT * FROM default.partitionedTable");
+        assertThat(
+                        spark.sql("SHOW CREATE TABLE default.partitionedTableAs")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        String.format(
+                                "[[CREATE TABLE partitionedTableAs (\n"
+                                        + "  `a` BIGINT,\n"
+                                        + "  `b` STRING,\n"
+                                        + "  `c` STRING)\n"
+                                        + "TBLPROPERTIES(\n"
+                                        + "  'partition' = 'a',\n"

Review Comment:
   It is wrong, there is not `PARTITIONED` definition.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement.");

Review Comment:
   `Define primary key by table options, cannot define primary key on DDL and table options at the same time.`.



##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java:
##########
@@ -214,6 +221,53 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
             catalogTable = catalogTable.copy(options);
         }
 
+        if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+            ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) catalogTable;
+            ResolvedSchema oldResolvedSchema = resolvedCatalogTable.getResolvedSchema();
+            if (oldResolvedSchema.getPrimaryKey().isPresent()) {
+                throw new CatalogException(
+                        "Table with primary key cannot contain `primary-key` option.");
+            }
+
+            String primaryKeys = options.get(CoreOptions.PRIMARY_KEY.key());

Review Comment:
   We should remove this key? same to partition.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =

Review Comment:
   immutable



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement.");
+
+    public static final ConfigOption<String> PARTITION =

Review Comment:
   immutable



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -474,6 +474,18 @@ public class CoreOptions implements Serializable {
                                                             + "$hour:00:00'."))
                                     .build());
 
+    public static final ConfigOption<String> PRIMARY_KEY =
+            key("primary-key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define primary key for `CREATE TABLE AS SELECT` statement.");
+
+    public static final ConfigOption<String> PARTITION =
+            key("partition")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Define partition for `CREATE TABLE AS SELECT` statement.");

Review Comment:
   Same to pk



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1117933201


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##########
@@ -123,6 +123,40 @@ public TableSchema createTable(Schema schema) throws Exception {
             Map<String, String> options = schema.options();
             int highestFieldId = RowType.currentHighestFieldId(fields);
 
+            List<String> columnNames =
+                    schema.fields().stream().map(DataField::name).collect(Collectors.toList());
+            if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+                if (!primaryKeys.isEmpty()) {

Review Comment:
   The validation should be added into `SchemaValidation#validateTableSchema`. Meanwhile adds the unit tests for this validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 closed pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 closed pull request #547: [FLINK-31128] Add Create Table As for flink table store
URL: https://github.com/apache/flink-table-store/pull/547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111831443


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,
+// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit.

Review Comment:
   @zhangjun0x01, because the explanation is in the `Flink` tab, you could explain that enable the checkpoint for streaming mode, no need to explain the job behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "SteNicholas (via GitHub)" <gi...@apache.org>.
SteNicholas commented on code in PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#discussion_r1111378285


##########
docs/content/docs/how-to/creating-tables.md:
##########
@@ -114,6 +114,78 @@ Partition keys must be a subset of primary keys if primary keys are defined.
 By configuring [partition.expiration-time]({{< ref "docs/maintenance/manage-partition" >}}), expired partitions can be automatically deleted.
 {{< /hint >}}
 
+## Create Table As
+
+Tables can also be created and populated by the results of a query. 
+
+{{< tabs "create-table-as" >}}
+
+{{< tab "Flink" >}}
+
+// Flink will create target table first and then start a `INSERT INTO target_table SELECT * FROM source_table` job,
+// for batch mode, the job will exit when the job finished, for streaming mode, the job will not exit.

Review Comment:
   It doesn't need to explain the job behavior for batch mode and streaming mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1441348097

   > > > Thanks @zhangjun0x01 , at present, the biggest problem with CTAS is that it cannot specify the primary key and partition column, which makes it almost unavailable for production. Or can we consider supporting them?
   > > 
   > > 
   > > @zhangjun0x01 The core problem is that Flink SQL CTAS dose not support pk partition key declaration, Maybe we can provide options for primary key and partition key just like Spark Create table for table store. What do you think?
   > 
   > I test it in hive , it also do not support the function , I didn't find a reference . One way to implement this function is to add options as you said. The other is to modify the `SqlCreateTableAs` parse syntax and add primary key and partition support.
   
   Yes, I don't think we expose documentation before support pk and partition definition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1443064776

   > > > > Thanks @zhangjun0x01 , at present, the biggest problem with CTAS is that it cannot specify the primary key and partition column, which makes it almost unavailable for production. Or can we consider supporting them?
   > > > 
   > > > 
   > > > @zhangjun0x01 The core problem is that Flink SQL CTAS dose not support pk partition key declaration, Maybe we can provide options for primary key and partition key just like Spark Create table for table store. What do you think?
   > > 
   > > 
   > > I test it in hive , it also do not support the function , I didn't find a reference . One way to implement this function is to add options as you said. The other is to modify the `SqlCreateTableAs` parse syntax and add primary key and partition support.
   > 
   > Yes, I don't think we should expose documentation before support pk and partition definition.
   
   hi,@JingsongLi , I add the primary key and partition support follow your comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #547: [FLINK-31128] Add Create Table As for flink table store

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #547:
URL: https://github.com/apache/flink-table-store/pull/547#issuecomment-1439554535

   > Thanks @zhangjun0x01 , at present, the biggest problem with CTAS is that it cannot specify the primary key and partition column, which makes it almost unavailable for production. Or can we consider supporting them?
   
   @zhangjun0x01 The core problem is that Flink SQL CTAS dose not support pk partition key declaration, Maybe we can provide options for primary key and partition key just like Spark Create table for table store. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org