You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ant burton <ap...@gmail.com> on 2017/08/16 21:27:49 UTC

Re: Access to datastream from BucketSink- RESOLVED

I have resolved my issue, thank you for your help.

The following code give me access to an element to determine a bucket directory name.

import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;

import org.apache.flink.streaming.connectors.fs.Clock;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.flink.api.java.tuple.Tuple2;

public class S3Bucketer implements Bucketer<String> {
    private static final long serialVersionUID = 1L;

    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        // Now that we have access to element, we can
        // generate a s3 filename path from it
        String s3_filename_path = "";

        return new Path(s3_filename_path);
    }
}


> On 16 Aug 2017, at 16:06, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Ant,
> 
> I think you are implementing the wrong Bucketer. 
> This seems to be the one for the RollingSink which is deprecated. 
> Is this correct?
> 
> You should implement the BucketingSink one, which is in the package:
> 
> org.apache.flink.streaming.connectors.fs.bucketing
> 
> That one requires the implementation of 1 method with signature:
> 
> Path getBucketPath(Clock clock, Path basePath, T element);
> 
> which from what I understand from you requirements gives you access 
> to the element that you need.
> 
> Cheers,
> Kostas
> 
>> On Aug 16, 2017, at 3:31 PM, ant burton <ap...@gmail.com> wrote:
>> 
>> 
>> Thanks Kostas,
>> 
>> I’m narrowing in on a solution:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html says "You can also specify a custom bucketer by using setBucketer() on a BucketingSink. If desired, the bucketer can use a property of the element or tuple to determine the bucket directory.”
>> 
>> BucketingSink<String> sink = new BucketingSink<String>("/base/path");
>> sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>> 
>> public class S3Bucketer implements Bucketer {
>> 	private static final long serialVersionUID = 1L;
>> 
>> 	private final String formatString;
>> 
>> 	public S3Bucketer() {
>> 	}
>> 
>> 	private void readObject(ObjectInputStream in) {
>> 		in.defaultReadObject();
>> 	}
>> 
>> 	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
>> 		return true;
>> 	}
>> 
>> 	public Path getNextBucketPath(Path basePath) {
>> 		return new Path(basePath + “/some-path-that-I-need-create-from-the-stream");
>> 	}
>> }
>> 
>> my question now is how do I access the data stream from within the S3Bucketer so that I can generate a filename based on the data with the data stream.
>> 
>> Thanks,
>> 
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k....@data-artisans.com> wrote:
>>> 
>>> In the second link for the BucketingSink, you can set your 
>>> own Bucketer using the setBucketer method. You do not have to 
>>> implement your own sink from scratch.
>>> 
>>> Kostas
>>> 
>>>> On Aug 16, 2017, at 1:39 PM, ant burton <ap...@gmail.com> wrote:
>>>> 
>>>> or rather https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>>> 
>>>> 
>>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>>> 
>>>>> Hi Ant,
>>>>> 
>>>>> I think you can do it by implementing your own Bucketer.
>>>>> 
>>>>> Cheers,
>>>>> Kostas
>>>>> 
>>>>> .
>>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <ap...@gmail.com> wrote:
>>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> Given 
>>>>>> 
>>>>>>      // Set StreamExecutionEnvironment
>>>>>>      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> 
>>>>>>      // Set checkpoints in ms
>>>>>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>> 
>>>>>>      // Add source (input stream)
>>>>>>      DataStream<String> dataStream = StreamUtil.getDataStream(env, params);
>>>>>> 
>>>>>> How can I construct the s3_filename from the content of the an event, it seems that whenever I attempt this I either have access to an event or access to .addSink but not both.
>>>>>> 
>>>>>> 	dataStream.addSink(new BucketingSink<String>("s3a://flink/" + s3_filename));
>>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>