You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by glaksh100 <gi...@git.apache.org> on 2018/04/16 23:33:39 UTC

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

GitHub user glaksh100 opened a pull request:

    https://github.com/apache/flink/pull/5860

    [FLINK-9138][filesystem-connectors] Implement time based rollover in BucketingSink

    ## What is the purpose of the change
    
    This pull request enables a time-based rollover of the part file in the BucketingSink. This is particularly applicable when when write throughput is low and helps data become available at a fixed interval, for consumption.
    
    ## Brief change log
      - Add a `batchRolloverInterval` field with a setter 
      - Track a `firstWrittenToTime` for the bucket state
      - Check for `currentProcessingTime` - `firstWrittenToTime` > `batchRolloverInterval` and roll over if true
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - Added a `testRolloverInterval` test method to the `BucketingSinkTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/glaksh100/flink FLINK-9138.bucketingSinkRolloverInterval

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5860.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5860
    
----
commit fee3ba293f4db4ad2d39b4ac0f3993711da9bda6
Author: Lakshmi Gururaja Rao <lg...@...>
Date:   2018-04-16T23:31:49Z

    [FLINK-9138] Implement time based rollover of part file in BucketingSink

----


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    @fhueske  I gave it some thought and your suggestion makes sense to me. I have extended `checkForInactiveBuckets` to include the rollover check. I have also updated Javadocs in a few places:
    - Added a note in the top-level Javadocs to update functionality of `checkForInactiveBuckets()`
    - Updated JavaDocs for both `setBatchRolloverInterval()` and `setInactiveBucketThreshold()`
    - Updated JavaDoc for `checkForInactiveBuckets()`
    
    Let me know if the updates make sense and thank you for reviewing!



---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by narayaruna <gi...@git.apache.org>.
Github user narayaruna commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    @glaksh100 If you are using BucketingSink with S3, you might be interested in this [PR](https://github.com/apache/flink/pull/4607) as well.


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r185731699
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
     		return this;
     	}
     
    +	/**
    +	 * Sets the roll over interval in milliseconds.
    +	 *
    +	 *
    +	 * <p>When a bucket part file is older than the roll over interval, a new bucket part file is
    +	 * started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}.
    +	 *
    +	 * @param batchRolloverInterval The roll over interval in milliseconds
    +	 */
    +	public BucketingSink<T> setBatchRolloverInterval(long batchRolloverInterval) {
    +		this.batchRolloverInterval = batchRolloverInterval;
    +		return this;
    --- End diff --
    
    Please add checks for invalid configs like negative values.


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    Hi @glaksh100, thanks for the update!


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    Thanks for the update @glaksh100! The changes look good to me. 
    What do you think @aljoscha?


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    Merging


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r186223099
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
     		return this;
     	}
     
    +	/**
    +	 * Sets the roll over interval in milliseconds.
    +	 *
    +	 *
    +	 * <p>When a bucket part file is older than the roll over interval, a new bucket part file is
    +	 * started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}.
    +	 *
    +	 * @param batchRolloverInterval The roll over interval in milliseconds
    +	 */
    +	public BucketingSink<T> setBatchRolloverInterval(long batchRolloverInterval) {
    +		this.batchRolloverInterval = batchRolloverInterval;
    +		return this;
    --- End diff --
    
    Added a check for `batchRolloverInterval` to be a positive non-zero value.


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r185732573
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
     					subtaskIndex,
     					writePosition,
     					batchSize);
    +			} else {
    +				long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
    --- End diff --
    
    You should pass in the current processing time to avoid calling this somewhat costly method twice.


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    Hi @glaksh100, 
    
    I just noticed that the bucket closing check is only done when a record is written. Hence, inactive buckets might not get closed in time if a larger inactive bucket interval is configured. In some sense, the new feature is an extended version of the inactive bucket closing feature.
    
    How should we handle that case?
    
    1. throw an exception during configuration, i.e., when `setInactiveBucketThreshold()` and `setBatchRolloverInterval()` are called.
    2. configure the inactive bucket interval to be at least the rollover interval in case it is configured larger and continue. We should also make sure that the check interval is configured appropriately.
    
    I'm leaning towards the first approach. It would make the misconfiguration obvious to the user and fail the program before it is submitted.
    
    What do you think?
    
    Best, Fabian


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183758338
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---
    @@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
     		checkFs(outDir, 0, 3, 5, 5);
     	}
     
    +	@Test
    +	public void testRolloverInterval() throws Exception {
    +		final File outDir = tempFolder.newFolder();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(0L);
    +
    +		testHarness.processElement(new StreamRecord<>("test1", 1L));
    +		checkFs(outDir, 1, 0,  0, 0);
    --- End diff --
    
    rm double blank


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    @fhueske Thank you for reviewing. I have incorporated the changes that include update to the [documentation](https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html) on the website. 
    
    Travis seems to be failing on a test case that seems unrelated -
    ```java.lang.AssertionError: This program execution should have failed.
    	at org.junit.Assert.fail(Assert.java:88)
    	at org.apache.flink.test.misc.SuccessAfterNetworkBuffersFailureITCase.testSuccessfulProgramAfterFailure(SuccessAfterNetworkBuffersFailureITCase.java:75)```


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183756583
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws
     			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
     		}
     
    +		// Record the creation time of the bucket
    +		bucketState.firstWrittenToTime = processingTimeService.getCurrentProcessingTime();
    --- End diff --
    
    rename parameter to `bucket.creationTime`? At this point, nothing has been written to the file. Actually, the file has not even been created yet.


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183757920
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -536,6 +553,9 @@ private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws
     			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
     		}
     
    +		// Record the creation time of the bucket
    +		bucketState.firstWrittenToTime = processingTimeService.getCurrentProcessingTime();
    --- End diff --
    
    I think the current behavior is also more consistent than starting the timeout when the first record is written. 


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5860


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183758663
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---
    @@ -436,6 +463,40 @@ public void testScalingUp() throws Exception {
     		checkFs(outDir, 0, 3, 5, 5);
     	}
     
    +	@Test
    +	public void testRolloverInterval() throws Exception {
    +		final File outDir = tempFolder.newFolder();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(0L);
    +
    +		testHarness.processElement(new StreamRecord<>("test1", 1L));
    +		checkFs(outDir, 1, 0,  0, 0);
    --- End diff --
    
    check other `checkFs()` calls as well


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183754480
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -472,6 +480,15 @@ private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
     					subtaskIndex,
     					writePosition,
     					batchSize);
    +			} else {
    --- End diff --
    
    update method comment.


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    Thanks for reviewing @fhueske  @aljoscha and @kl0u ! I have addressed the latest review comments. Can you PTAL (again) ?


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    @fhueske Can you PTAL and merge this PR? 


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r186223237
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
     					subtaskIndex,
     					writePosition,
     					batchSize);
    +			} else {
    +				long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
    --- End diff --
    
    Updated method signature for `shouldRoll` to include the `currentProcessingTime` 


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183754129
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -87,9 +87,11 @@
      * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
      * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
      * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
    - * When a part file becomes bigger than the user-specified batch size the current part file is closed,
    - * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
    - * this can be configured using {@link #setBatchSize(long)}.
    + * When a part file becomes bigger than the user-specified batch size or when the part file becomes older
    + * than the user-specified roll over interval the current part file is closed,the part counter is increased
    + * and a new part file is created. The batch size defaults to {@code 384MB},this can be configured
    --- End diff --
    
    Add space `{@code 384MB},this` -> `{@code 384MB}, this`


---

[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5860#discussion_r183753866
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -87,9 +87,11 @@
      * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
      * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
      * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
    - * When a part file becomes bigger than the user-specified batch size the current part file is closed,
    - * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
    - * this can be configured using {@link #setBatchSize(long)}.
    + * When a part file becomes bigger than the user-specified batch size or when the part file becomes older
    + * than the user-specified roll over interval the current part file is closed,the part counter is increased
    --- End diff --
    
    Add space `closed,the` -> `closed, the`


---

[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

Posted by glaksh100 <gi...@git.apache.org>.
Github user glaksh100 commented on the issue:

    https://github.com/apache/flink/pull/5860
  
    @aljoscha @fhueske Can you please take a look? 


---