You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 14:01:23 UTC

[GitHub] [flink] Tartarus0zm opened a new pull request, #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Tartarus0zm opened a new pull request, #20394:
URL: https://github.com/apache/flink/pull/20394

   ## What is the purpose of the change
   
   When inserting a hive static partition, if no data is generated, we also need to add the hive partition
   
   ## Brief change log
   
   * Modify the PartitionLoader and FileSystemCommitter to add hive partition when use static partition
   
   
   ## Verifying this change
   
   add ITCase in HiveDialectITCase
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on PR #20394:
URL: https://github.com/apache/flink/pull/20394#issuecomment-1204734039

   LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20394:
URL: https://github.com/apache/flink/pull/20394#discussion_r937290113


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##########
@@ -441,6 +441,61 @@ public void testCustomPartitionCommitPolicy() throws Exception {
         testStreamingWriteWithCustomPartitionCommitPolicy(TestCustomCommitPolicy.class.getName());
     }
 
+    @Test
+    public void testAddEmptyPartition() throws Exception {

Review Comment:
   nit:
   ```suggestion
       public void testWritingNoDataToPartition() throws Exception {
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -86,6 +86,32 @@ public void loadNonPartition(List<Path> srcDirs) throws Exception {
         overwriteAndMoveFiles(srcDirs, tableLocation);
     }
 
+    /**
+     * The flink job does not write data to the partition, but the corresponding partition needs to
+     * be created.

Review Comment:
   nit:
   ```suggestion
        * be created or updated.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on PR #20394:
URL: https://github.com/apache/flink/pull/20394#issuecomment-1204847934

   @wuchong Could you please help review & merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20394:
URL: https://github.com/apache/flink/pull/20394#discussion_r935043184


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java:
##########
@@ -57,25 +57,31 @@ class FileSystemCommitter implements Serializable {
     private final boolean overwrite;
     private final Path tmpPath;
     private final int partitionColumnSize;
+    private final LinkedHashMap<String, String> staticPartitions;
 
     FileSystemCommitter(
             FileSystemFactory factory,
             TableMetaStoreFactory metaStoreFactory,
             boolean overwrite,
             Path tmpPath,
-            int partitionColumnSize) {
+            int partitionColumnSize,
+            LinkedHashMap<String, String> staticPartitions) {
         this.factory = factory;
         this.metaStoreFactory = metaStoreFactory;
         this.overwrite = overwrite;
         this.tmpPath = tmpPath;
         this.partitionColumnSize = partitionColumnSize;
+        this.staticPartitions = staticPartitions;
     }
 
     /** For committing job's output after successful batch job completion. */
     public void commitPartitions() throws Exception {
         FileSystem fs = factory.create(tmpPath.toUri());
         List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath);
-
+        if (taskPaths.isEmpty() && !staticPartitions.isEmpty()) {

Review Comment:
   Could you please move such logic to the lines since no data in a partition is just a special case for it. 
   ```java
   if (partitionColumnSize > 0) {
   xxx
   }
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -78,6 +78,31 @@ public void loadNonPartition(List<Path> srcDirs) throws Exception {
         overwriteAndRenameFiles(srcDirs, tableLocation);
     }
 
+    /**
+     * The flink job does not generate data, but the corresponding partition needs to be created.
+     *
+     * <p>The partition does not exist, create it.
+     *
+     * <p>The partition exists:
+     *
+     * <pre>
+     *      if overwrite is true, delete the path, then create it;
+     *      if overwrite is false, do nothing;
+     * </pre>
+     */
+    public void loadEmptyPartition(LinkedHashMap<String, String> partSpec) throws Exception {
+        Optional<Path> pathFromMeta = metaStore.getPartition(partSpec);
+        if (pathFromMeta.isPresent() && !overwrite) {
+            return;
+        }
+        Path path = new Path(metaStore.getLocationPath(), generatePartitionPath(partSpec));
+        if (pathFromMeta.isPresent() && overwrite) {

Review Comment:
   `overwrite` in here is redundant?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -78,6 +78,31 @@ public void loadNonPartition(List<Path> srcDirs) throws Exception {
         overwriteAndRenameFiles(srcDirs, tableLocation);
     }
 
+    /**
+     * The flink job does not generate data, but the corresponding partition needs to be created.

Review Comment:
   ```suggestion
        * The flink job does not write data to the partition, but the corresponding partition needs to be created.
   ```



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -61,7 +61,12 @@ private void createFile(java.nio.file.Path parent, String path, String... files)
     void testPartition() throws Exception {

Review Comment:
   Also add tests for committing empty partition in here.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java:
##########
@@ -940,6 +940,100 @@ public void testUnsupportedOperation() {
         }
     }
 
+    @Test
+    public void testInsertIntoNotExistStaticPartitionWithoutData() throws Exception {

Review Comment:
   I think these tests can move to `HiveTableSinkITCase` and can be composed to one single test.
   



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:
##########
@@ -104,12 +104,14 @@ public void testOverwriteWithEmptySource() throws Exception {
                             tableEnv.executeSql("select * from destp order by x").collect());
             assertThat(results.toString()).isEqualTo("[+I[1, 1], +I[2, 2]]");
             // static partitioned table
+            // see FLINK-28720, The semantics of overwrite is to overwrite the original data, so the

Review Comment:
   I think  `see FLINK-28720,` can be removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #20394:
URL: https://github.com/apache/flink/pull/20394


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20394: [FLINK-28720][hive][connectors] Add Hive partition when flink has no data to write

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20394:
URL: https://github.com/apache/flink/pull/20394#issuecomment-1199344664

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "29cd5e4c328d77b17ab1400969b86e49ce6f4d7f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "29cd5e4c328d77b17ab1400969b86e49ce6f4d7f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 29cd5e4c328d77b17ab1400969b86e49ce6f4d7f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org