You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhangminglei <gi...@git.apache.org> on 2018/07/19 14:47:31 UTC

[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

GitHub user zhangminglei opened a pull request:

    https://github.com/apache/flink/pull/6375

    [FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSinā€¦

    ## What is the purpose of the change
    Currently, BucketingSink only support ```notifyCheckpointComplete```. However, users want to do some extra work when a bucket is ready. It would be nice if we can support BucketReady mechanism for users or we can tell users when a bucket is ready for use. For example, One bucket is created for every 5 minutes, at the end of 5 minutes before creating the next bucket, the user might need to do something as the previous bucket ready, like sending the timestamp of the bucket ready time to a server or do some other stuff.
    
    Here, Bucket ready means all the part files suffix name under a bucket neither .pending nor .in-progress. Then we can think this bucket is ready for user use. Like a watermark means no elements with a timestamp older or equal to the watermark timestamp should arrive at the window. We can also refer to the concept of watermark here, or we can call this BucketWatermark if we could.
    
    ## Brief change log
    Add an interface ```BucketReady``` .
    
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhangminglei/flink flink-9609-bucketready

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6375.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6375
    
----
commit f95894956ac15d09b51b3a232d6f83227582e641
Author: zhangminglei <zm...@...>
Date:   2018-07-19T14:38:45Z

    [FLINK-9609] [connectors] Add bucket ready mechanism for BucketingSink when checkpoint complete

----


---

[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6375#discussion_r203996832
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					}
     				}
     			}
    +			isBucketReady(partitionPaths);
     		}
     	}
     
    +	@Override
    +	public boolean isBucketReady(Set<Path> bucketPathes) {
    +		for (Path path : bucketPathes) {
    +			try {
    +				RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, false);
    +				while (files.hasNext()) {
    +					LocatedFileStatus fileStatus = files.next();
    +					if (fileStatus.getPath().getName().endsWith(pendingSuffix) ||
    +						fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
    +						return false;
    +					}
    +				}
    +				return true;
    --- End diff --
    
    I mean this return statement, can not verify all the bucket path is ready, right? because the loop is not finished.


---

[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6375#discussion_r203997376
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					}
     				}
     			}
    +			isBucketReady(partitionPaths);
     		}
     	}
     
    +	@Override
    +	public boolean isBucketReady(Set<Path> bucketPathes) {
    +		for (Path path : bucketPathes) {
    +			try {
    +				RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, false);
    +				while (files.hasNext()) {
    +					LocatedFileStatus fileStatus = files.next();
    +					if (fileStatus.getPath().getName().endsWith(pendingSuffix) ||
    +						fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
    +						return false;
    +					}
    +				}
    +				return true;
    --- End diff --
    
    Ahhh. Oops! Yes, you are very right! :)


---

[GitHub] flink issue #6375: [FLINK-9609] [connectors] Add bucket ready mechanism for ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6375
  
    Hi, @tillrohrmann Could you please take a look on this PR ? Actually, I don't know which committer is more familiar with ```BucketingSink```. Thank you so much.


---

[GitHub] flink pull request #6375: [FLINK-9609] [connectors] Add bucket ready mechani...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6375#discussion_r203995868
  
    --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---
    @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     					}
     				}
     			}
    +			isBucketReady(partitionPaths);
     		}
     	}
     
    +	@Override
    +	public boolean isBucketReady(Set<Path> bucketPathes) {
    --- End diff --
    
    If we finish the for loop and haven't returned true yet, then return false. I think this is reasonable. If a file end with pending or inProgress, This means that the file is not ready. and we can not use it.
    



---