You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/15 10:54:11 UTC

[GitHub] [flink] JingsongLi opened a new pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

JingsongLi opened a new pull request #12168:
URL: https://github.com/apache/flink/pull/12168


   
   ## What is the purpose of the change
   
   Integrate hive to streaming file sink.
   
   ## Brief change log
   
   - Integrate to `HadoopPathBasedBulkWriterFactory`.
   - Integrate to `StreamingFileWriter` and `StreamingFileCommitter`.
   
   ## Verifying this change
   
   TODO
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   * 3d9fae500f937919da069b2d2e973a9369e7795f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2903876d8c6e5b693f13fa38fbb40ce7ca96419b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2903876d8c6e5b693f13fa38fbb40ce7ca96419b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b38d640d3e7d3590ee882c76c94e203719146 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597) 
   * 8ae4f5726aca8dc545e6c647b77b53f6ad59f38f UNKNOWN
   * 2903876d8c6e5b693f13fa38fbb40ce7ca96419b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   * 3d9fae500f937919da069b2d2e973a9369e7795f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512) 
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233102



##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.

Review comment:
       You can add comments in https://github.com/apache/flink/pull/12134




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   * 3d9fae500f937919da069b2d2e973a9369e7795f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b38d640d3e7d3590ee882c76c94e203719146 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629171346


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2c97f52f8d206393ab06a555d3510392b2638015 (Fri May 15 10:57:45 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426141927



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveRowDataPartitionComputer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.filesystem.RowDataPartitionComputer;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+
+/**
+ * A {@link RowDataPartitionComputer} that converts Flink objects to Hive objects before computing the partition value strings.
+ */
+public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
+
+	private final DataFormatConverters.DataFormatConverter[] partitionConverters;
+	private final HiveObjectConversion[] partColConversions;

Review comment:
       We already have `partitionConverters`. I think we can just name this one as `hiveObjectConversions`.

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.
+ */
+public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(2000);
+
+	@Test
+	public void testWriteFile() throws Exception {
+		File file = TEMPORARY_FOLDER.newFolder();
+		Path basePath = new Path(file.toURI());
+
+		List<String> data = Arrays.asList(
+			"first line",
+			"second line",
+			"third line");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<String> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(String.class));
+		Configuration configuration = new Configuration();
+
+		HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
+			new HadoopPathBasedBulkFormatBuilder<>(
+				basePath,
+				new TestHadoopPathBasedBulkWriterFactory(),
+				configuration,
+				new DateTimeBucketAssigner<>());
+		TestStreamingFileSinkFactory<String> streamingFileSinkFactory = new TestStreamingFileSinkFactory<>();
+		stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
+
+		env.execute();
+		validateResult(data, configuration, basePath);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void validateResult(List<String> expected, Configuration config, Path basePath) throws IOException {
+		FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
+		FileStatus[] buckets = fileSystem.listStatus(basePath);
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+		assertNotNull(partFiles);
+		assertEquals(2, partFiles.length);

Review comment:
       Why will there be two files?

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java
##########
@@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) {
 				}
 			}
 
-			RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
+			return hiveShim.getHiveRecordWriter(
 					conf,
 					hiveOutputFormatClz,
 					recordSerDe.getSerializedClass(),
 					isCompressed,
 					tableProperties,
-					HadoopFileSystem.toHadoopPath(outPath));
-			return new HiveOutputFormat(recordWriter);
+					path);
 		} catch (Exception e) {
 			throw new FlinkHiveException(e);
 		}
 	}
 
-	private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
-
-		private final RecordWriter recordWriter;
+	public JobConf getJobConf() {
+		return confWrapper.conf();
+	}
 
-		private HiveOutputFormat(RecordWriter recordWriter) {
-			this.recordWriter = recordWriter;
-		}
+	private void initialize() throws Exception {
+		JobConf jobConf = confWrapper.conf();
+		Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
+		Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+				"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+						+ serdeLib.getClass().getName());
+		this.recordSerDe = (Serializer) serdeLib;
+		ReflectionUtils.setConf(recordSerDe, jobConf);
 
-		// converts a Row to a list of Hive objects so that Hive can serialize it
-		private Object getConvertedRow(Row record) {
-			List<Object> res = new ArrayList<>(numNonPartitionColumns);
-			for (int i = 0; i < numNonPartitionColumns; i++) {
-				res.add(hiveConversions[i].toHiveObject(record.getField(i)));
-			}
-			return res;
-		}
+		// TODO: support partition properties, for now assume they're same as table properties
+		SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
 
-		@Override
-		public void configure(Configuration parameters) {
+		this.formatFields = allColumns.length - partitionColumns.length;
+		this.hiveConversions = new HiveObjectConversion[formatFields];
+		this.converters = new DataFormatConverter[formatFields];
+		List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
+		for (int i = 0; i < formatFields; i++) {
+			DataType type = allTypes[i];
+			ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+			objectInspectors.add(objectInspector);
+			hiveConversions[i] = HiveInspectors.getConversion(
+					objectInspector, type.getLogicalType(), hiveShim);
+			converters[i] = DataFormatConverters.getConverterForDataType(type);
 		}
 
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-		}
+		this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+				Arrays.asList(allColumns).subList(0, formatFields),
+				objectInspectors);
+	}
 
-		@Override
-		public void writeRecord(Row record) throws IOException {
-			try {
-				recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
-			} catch (SerDeException e) {
-				throw new IOException(e);
-			}
+	Writable toHiveWritable(Row row) throws SerDeException {

Review comment:
       It seems strange that a "record writer factory" is also responsible for converting data between Flink and Hive. Can we move these methods to somewhere else? Or perhaps this class shouldn't be a factory in the first place.

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.

Review comment:
       Doesn't seem to be a **base** class

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.connectors.hive.write.DefaultHadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedBulkWriterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedPartFileWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}.
+ */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>

Review comment:
       Why is this a "format builder" instead of a "buckets builder"?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
##########
@@ -213,6 +228,95 @@ public void testWriteNullValues() throws Exception {
 		}
 	}
 
+	@Test(timeout = 120000)
+	public void testPartStreamingWrite() throws Exception {
+		testStreamingWrite(true, (path) -> {
+			File basePath = new File(path, "d=2020-05-03");
+			Assert.assertEquals(5, basePath.list().length);
+			Assert.assertTrue(new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
+		});
+	}
+
+	@Test(timeout = 120000)
+	public void testNonPartStreamingWrite() throws Exception {
+		testStreamingWrite(false, (p) -> {});

Review comment:
       Does this mean success file is not written for non-partitioned tables?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
##########
@@ -20,122 +20,172 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
 /**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
+ * The {@link Bucket} uses the {@link PartFileWriter} to write element to a part file.
  */
 @Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
 
-	private final BucketID bucketId;
+	/**
+	 * Write a element to the part file.
+	 * @param element the element to be written.
+	 * @param currentTime the writing time.
+	 * @throws IOException Thrown if writing the element fails.
+	 */
+	void write(final IN element, final long currentTime) throws IOException;
 
-	private final long creationTime;
+	/**
+	 * @return The state of the current part file.
+	 * @throws IOException Thrown if persisting the part file fails.
+	 */
+	InProgressFileRecoverable persist() throws IOException;
 
-	protected final RecoverableFsDataOutputStream currentPartStream;
 
-	private long lastUpdateTime;
+	/**
+	 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+	 * @throws IOException Thrown if an I/O error occurs.
+	 */
+	PendingFileRecoverable closeForCommit() throws IOException;
 
-	protected PartFileWriter(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
+	/**
+	 * Dispose the part file.
+	 */
+	void dispose();
 
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
+	// ------------------------------------------------------------------------
 
-	abstract void write(IN element, long currentTime) throws IOException;
+	/**
+	 * An interface for factories that create the different {@link PartFileWriter writers}.
+	 */
+	interface PartFileFactory<IN, BucketID> {
 
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
+		/**
+		 * Used to create a new {@link PartFileWriter}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param path the path this writer will write to.
+		 * @param creationTime the creation time of the file.
+		 * @return the new {@link PartFileWriter}
+		 * @throws IOException Thrown if creating a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> openNew(
+			final BucketID bucketID,
+			final Path path,
+			final long creationTime) throws IOException;
 
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
+		/**
+		 * Used to resume a {@link PartFileWriter} from a {@link InProgressFileRecoverable}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param inProgressFileSnapshot the state of the part file.
+		 * @param creationTime the creation time of the file.
+		 * @return the resumed {@link PartFileWriter}
+		 * @throws IOException Thrown if resuming a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketID,
+			final InProgressFileRecoverable inProgressFileSnapshot,
+			final long creationTime) throws IOException;
 
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
+		/**
+		 * Recovers a pending file for finalizing and committing.
+		 * @param pendingFileRecoverable The handle with the recovery information.
+		 * @return A pending file
+		 * @throws IOException Thrown if recovering a pending file fails.
+		 */
+		PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException;
 
-	void markWrite(long now) {
-		this.lastUpdateTime = now;
-	}
+		/**
+		 * Marks if requiring to do any additional cleanup/freeing of resources occupied
+		 * as part of a {@link InProgressFileRecoverable}.
+		 *
+		 * <p>In case cleanup is required, then {@link #cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should
+		 * be called.
+		 *
+		 * @return {@code true} if cleanup is required, {@code false} otherwise.
+		 */
+		boolean requiresCleanupOfInProgressFileRecoverableState();
 
-	@Override
-	public BucketID getBucketId() {
-		return bucketId;
-	}
+		/**
+		 * Frees up any resources that were previously occupied in order to be able to
+		 * recover from a (potential) failure.
+		 *
+		 * <p><b>NOTE:</b> This operation should not throw an exception if the {@link InProgressFileRecoverable} has already
+		 * been cleaned up and the resources have been freed. But the contract is that it will throw
+		 * an {@link UnsupportedOperationException} if it is called for a {@link PartFileFactory}
+		 * whose {@link #requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}.
+		 *
+		 * @param inProgressFileRecoverable the {@link InProgressFileRecoverable} whose state we want to clean-up.
+		 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+		 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+		 * @throws IOException if an I/O error occurs
+		 */
+		boolean cleanupInProgressFileRecoverable(final InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
 
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
 
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
+		/**
+		 * @return the serializer for the {@link PendingFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends PendingFileRecoverable> getPendingFileRecoverableSerializer();
 
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
+		/**
+		 * @return the serializer for the {@link InProgressFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends InProgressFileRecoverable> getInProgressFileRecoverableSerializer();
+
+		/**
+		 * Checks whether the {@link PartFileWriter} supports resuming (appending to) files after
+		 * recovery (via the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method).
+		 *
+		 * <p>If true, then this writer supports the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method.
+		 * If false, then that method may not be supported and file can only be recovered via
+		 * {@link #recoverPendingFile(PendingFileRecoverable)}.
+		 */
+		boolean supportsResume();
 	}
 
-	// ------------------------------------------------------------------------
+	 /**
+	 * A handle can be used to recover in-progress file..
+	 */
+	interface InProgressFileRecoverable extends PendingFileRecoverable {}

Review comment:
       Can we have some comments to explain the difference between `pending file` and `in progress file`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   * 3d9fae500f937919da069b2d2e973a9369e7795f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512) 
   * 0824a047d1e106d1aa234c1659a98addb87012da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b38d640d3e7d3590ee882c76c94e203719146 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597) 
   * 8ae4f5726aca8dc545e6c647b77b53f6ad59f38f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629171346


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2903876d8c6e5b693f13fa38fbb40ce7ca96419b (Fri Oct 16 10:35:02 UTC 2020)
   
   **Warnings:**
    * **3 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233336



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.connectors.hive.write.DefaultHadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedBulkWriterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedPartFileWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}.
+ */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>

Review comment:
       The naming style is from `StreamingFileSink`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233687



##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java
##########
@@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) {
 				}
 			}
 
-			RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
+			return hiveShim.getHiveRecordWriter(
 					conf,
 					hiveOutputFormatClz,
 					recordSerDe.getSerializedClass(),
 					isCompressed,
 					tableProperties,
-					HadoopFileSystem.toHadoopPath(outPath));
-			return new HiveOutputFormat(recordWriter);
+					path);
 		} catch (Exception e) {
 			throw new FlinkHiveException(e);
 		}
 	}
 
-	private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> {
-
-		private final RecordWriter recordWriter;
+	public JobConf getJobConf() {
+		return confWrapper.conf();
+	}
 
-		private HiveOutputFormat(RecordWriter recordWriter) {
-			this.recordWriter = recordWriter;
-		}
+	private void initialize() throws Exception {
+		JobConf jobConf = confWrapper.conf();
+		Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance();
+		Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+				"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+						+ serdeLib.getClass().getName());
+		this.recordSerDe = (Serializer) serdeLib;
+		ReflectionUtils.setConf(recordSerDe, jobConf);
 
-		// converts a Row to a list of Hive objects so that Hive can serialize it
-		private Object getConvertedRow(Row record) {
-			List<Object> res = new ArrayList<>(numNonPartitionColumns);
-			for (int i = 0; i < numNonPartitionColumns; i++) {
-				res.add(hiveConversions[i].toHiveObject(record.getField(i)));
-			}
-			return res;
-		}
+		// TODO: support partition properties, for now assume they're same as table properties
+		SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
 
-		@Override
-		public void configure(Configuration parameters) {
+		this.formatFields = allColumns.length - partitionColumns.length;
+		this.hiveConversions = new HiveObjectConversion[formatFields];
+		this.converters = new DataFormatConverter[formatFields];
+		List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
+		for (int i = 0; i < formatFields; i++) {
+			DataType type = allTypes[i];
+			ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type);
+			objectInspectors.add(objectInspector);
+			hiveConversions[i] = HiveInspectors.getConversion(
+					objectInspector, type.getLogicalType(), hiveShim);
+			converters[i] = DataFormatConverters.getConverterForDataType(type);
 		}
 
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {
-		}
+		this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+				Arrays.asList(allColumns).subList(0, formatFields),
+				objectInspectors);
+	}
 
-		@Override
-		public void writeRecord(Row record) throws IOException {
-			try {
-				recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
-			} catch (SerDeException e) {
-				throw new IOException(e);
-			}
+	Writable toHiveWritable(Row row) throws SerDeException {

Review comment:
       I will rename to `HiveWriterFactory` and add `createRowConverter` methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ae4f5726aca8dc545e6c647b77b53f6ad59f38f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2903876d8c6e5b693f13fa38fbb40ce7ca96419b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1633",
       "triggerID" : "2903876d8c6e5b693f13fa38fbb40ce7ca96419b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b38d640d3e7d3590ee882c76c94e203719146 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597) 
   * 8ae4f5726aca8dc545e6c647b77b53f6ad59f38f UNKNOWN
   * 2903876d8c6e5b693f13fa38fbb40ce7ca96419b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1633) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507) 
   * 3d9fae500f937919da069b2d2e973a9369e7795f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512) 
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233235



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
##########
@@ -20,122 +20,172 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
 /**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
+ * The {@link Bucket} uses the {@link PartFileWriter} to write element to a part file.
  */
 @Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
 
-	private final BucketID bucketId;
+	/**
+	 * Write a element to the part file.
+	 * @param element the element to be written.
+	 * @param currentTime the writing time.
+	 * @throws IOException Thrown if writing the element fails.
+	 */
+	void write(final IN element, final long currentTime) throws IOException;
 
-	private final long creationTime;
+	/**
+	 * @return The state of the current part file.
+	 * @throws IOException Thrown if persisting the part file fails.
+	 */
+	InProgressFileRecoverable persist() throws IOException;
 
-	protected final RecoverableFsDataOutputStream currentPartStream;
 
-	private long lastUpdateTime;
+	/**
+	 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+	 * @throws IOException Thrown if an I/O error occurs.
+	 */
+	PendingFileRecoverable closeForCommit() throws IOException;
 
-	protected PartFileWriter(
-			final BucketID bucketId,
-			final RecoverableFsDataOutputStream currentPartStream,
-			final long creationTime) {
+	/**
+	 * Dispose the part file.
+	 */
+	void dispose();
 
-		Preconditions.checkArgument(creationTime >= 0L);
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
-		this.creationTime = creationTime;
-		this.lastUpdateTime = creationTime;
-	}
+	// ------------------------------------------------------------------------
 
-	abstract void write(IN element, long currentTime) throws IOException;
+	/**
+	 * An interface for factories that create the different {@link PartFileWriter writers}.
+	 */
+	interface PartFileFactory<IN, BucketID> {
 
-	RecoverableWriter.ResumeRecoverable persist() throws IOException {
-		return currentPartStream.persist();
-	}
+		/**
+		 * Used to create a new {@link PartFileWriter}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param path the path this writer will write to.
+		 * @param creationTime the creation time of the file.
+		 * @return the new {@link PartFileWriter}
+		 * @throws IOException Thrown if creating a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> openNew(
+			final BucketID bucketID,
+			final Path path,
+			final long creationTime) throws IOException;
 
-	RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
-		return currentPartStream.closeForCommit().getRecoverable();
-	}
+		/**
+		 * Used to resume a {@link PartFileWriter} from a {@link InProgressFileRecoverable}.
+		 * @param bucketID the id of the bucket this writer is writing to.
+		 * @param inProgressFileSnapshot the state of the part file.
+		 * @param creationTime the creation time of the file.
+		 * @return the resumed {@link PartFileWriter}
+		 * @throws IOException Thrown if resuming a writer fails.
+		 */
+		PartFileWriter<IN, BucketID> resumeFrom(
+			final BucketID bucketID,
+			final InProgressFileRecoverable inProgressFileSnapshot,
+			final long creationTime) throws IOException;
 
-	void dispose() {
-		// we can suppress exceptions here, because we do not rely on close() to
-		// flush or persist any data
-		IOUtils.closeQuietly(currentPartStream);
-	}
+		/**
+		 * Recovers a pending file for finalizing and committing.
+		 * @param pendingFileRecoverable The handle with the recovery information.
+		 * @return A pending file
+		 * @throws IOException Thrown if recovering a pending file fails.
+		 */
+		PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException;
 
-	void markWrite(long now) {
-		this.lastUpdateTime = now;
-	}
+		/**
+		 * Marks if requiring to do any additional cleanup/freeing of resources occupied
+		 * as part of a {@link InProgressFileRecoverable}.
+		 *
+		 * <p>In case cleanup is required, then {@link #cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should
+		 * be called.
+		 *
+		 * @return {@code true} if cleanup is required, {@code false} otherwise.
+		 */
+		boolean requiresCleanupOfInProgressFileRecoverableState();
 
-	@Override
-	public BucketID getBucketId() {
-		return bucketId;
-	}
+		/**
+		 * Frees up any resources that were previously occupied in order to be able to
+		 * recover from a (potential) failure.
+		 *
+		 * <p><b>NOTE:</b> This operation should not throw an exception if the {@link InProgressFileRecoverable} has already
+		 * been cleaned up and the resources have been freed. But the contract is that it will throw
+		 * an {@link UnsupportedOperationException} if it is called for a {@link PartFileFactory}
+		 * whose {@link #requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}.
+		 *
+		 * @param inProgressFileRecoverable the {@link InProgressFileRecoverable} whose state we want to clean-up.
+		 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+		 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+		 * @throws IOException if an I/O error occurs
+		 */
+		boolean cleanupInProgressFileRecoverable(final InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
 
-	@Override
-	public long getCreationTime() {
-		return creationTime;
-	}
 
-	@Override
-	public long getSize() throws IOException {
-		return currentPartStream.getPos();
-	}
+		/**
+		 * @return the serializer for the {@link PendingFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends PendingFileRecoverable> getPendingFileRecoverableSerializer();
 
-	@Override
-	public long getLastUpdateTime() {
-		return lastUpdateTime;
+		/**
+		 * @return the serializer for the {@link InProgressFileRecoverable}.
+		 */
+		SimpleVersionedSerializer<? extends InProgressFileRecoverable> getInProgressFileRecoverableSerializer();
+
+		/**
+		 * Checks whether the {@link PartFileWriter} supports resuming (appending to) files after
+		 * recovery (via the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method).
+		 *
+		 * <p>If true, then this writer supports the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method.
+		 * If false, then that method may not be supported and file can only be recovered via
+		 * {@link #recoverPendingFile(PendingFileRecoverable)}.
+		 */
+		boolean supportsResume();
 	}
 
-	// ------------------------------------------------------------------------
+	 /**
+	 * A handle can be used to recover in-progress file..
+	 */
+	interface InProgressFileRecoverable extends PendingFileRecoverable {}

Review comment:
       pending: closed flie or opened file. But not commit.
   in progress: opened file, still need append records.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   * ca3b38d640d3e7d3590ee882c76c94e203719146 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1597) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2c97f52f8d206393ab06a555d3510392b2638015 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421) 
   * ae89a7cc9ff688be1025d5526f2f698fe46dde5f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi closed pull request #12168: [FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi closed pull request #12168:
URL: https://github.com/apache/flink/pull/12168


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ca3b38d640d3e7d3590ee882c76c94e203719146",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   * ca3b38d640d3e7d3590ee882c76c94e203719146 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233061



##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
##########
@@ -213,6 +228,95 @@ public void testWriteNullValues() throws Exception {
 		}
 	}
 
+	@Test(timeout = 120000)
+	public void testPartStreamingWrite() throws Exception {
+		testStreamingWrite(true, (path) -> {
+			File basePath = new File(path, "d=2020-05-03");
+			Assert.assertEquals(5, basePath.list().length);
+			Assert.assertTrue(new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
+			Assert.assertTrue(new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
+		});
+	}
+
+	@Test(timeout = 120000)
+	public void testNonPartStreamingWrite() throws Exception {
+		testStreamingWrite(false, (p) -> {});

Review comment:
       Yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233264



##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with different configurations.
+ */
+public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(2000);
+
+	@Test
+	public void testWriteFile() throws Exception {
+		File file = TEMPORARY_FOLDER.newFolder();
+		Path basePath = new Path(file.toURI());
+
+		List<String> data = Arrays.asList(
+			"first line",
+			"second line",
+			"third line");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<String> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(String.class));
+		Configuration configuration = new Configuration();
+
+		HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
+			new HadoopPathBasedBulkFormatBuilder<>(
+				basePath,
+				new TestHadoopPathBasedBulkWriterFactory(),
+				configuration,
+				new DateTimeBucketAssigner<>());
+		TestStreamingFileSinkFactory<String> streamingFileSinkFactory = new TestStreamingFileSinkFactory<>();
+		stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
+
+		env.execute();
+		validateResult(data, configuration, basePath);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void validateResult(List<String> expected, Configuration config, Path basePath) throws IOException {
+		FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
+		FileStatus[] buckets = fileSystem.listStatus(basePath);
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+		assertNotNull(partFiles);
+		assertEquals(2, partFiles.length);

Review comment:
       `FiniteTestSource` emit records twice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12168: [WIP][FLINK-14255][hive] Integrate hive to streaming file sink

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12168:
URL: https://github.com/apache/flink/pull/12168#issuecomment-629179735


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1421",
       "triggerID" : "2c97f52f8d206393ab06a555d3510392b2638015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1507",
       "triggerID" : "ae89a7cc9ff688be1025d5526f2f698fe46dde5f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512",
       "triggerID" : "3d9fae500f937919da069b2d2e973a9369e7795f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529",
       "triggerID" : "0824a047d1e106d1aa234c1659a98addb87012da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3d9fae500f937919da069b2d2e973a9369e7795f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1512) 
   * 0824a047d1e106d1aa234c1659a98addb87012da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1529) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org