You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ant burton <ap...@gmail.com> on 2017/08/16 21:27:49 UTC
Re: Access to datastream from BucketSink- RESOLVED
I have resolved my issue, thank you for your help.
The following code give me access to an element to determine a bucket directory name.
import org.apache.hadoop.fs.Path;
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.flink.api.java.tuple.Tuple2;
public class S3Bucketer implements Bucketer<String> {
private static final long serialVersionUID = 1L;
@Override
public Path getBucketPath(Clock clock, Path basePath, String element) {
// Now that we have access to element, we can
// generate a s3 filename path from it
String s3_filename_path = "";
return new Path(s3_filename_path);
}
}
> On 16 Aug 2017, at 16:06, Kostas Kloudas <k....@data-artisans.com> wrote:
>
> Hi Ant,
>
> I think you are implementing the wrong Bucketer.
> This seems to be the one for the RollingSink which is deprecated.
> Is this correct?
>
> You should implement the BucketingSink one, which is in the package:
>
> org.apache.flink.streaming.connectors.fs.bucketing
>
> That one requires the implementation of 1 method with signature:
>
> Path getBucketPath(Clock clock, Path basePath, T element);
>
> which from what I understand from you requirements gives you access
> to the element that you need.
>
> Cheers,
> Kostas
>
>> On Aug 16, 2017, at 3:31 PM, ant burton <ap...@gmail.com> wrote:
>>
>>
>> Thanks Kostas,
>>
>> I’m narrowing in on a solution:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html says "You can also specify a custom bucketer by using setBucketer() on a BucketingSink. If desired, the bucketer can use a property of the element or tuple to determine the bucket directory.”
>>
>> BucketingSink<String> sink = new BucketingSink<String>("/base/path");
>> sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
>> Therefore I’ve created a skeleton class:
>>
>> public class S3Bucketer implements Bucketer {
>> private static final long serialVersionUID = 1L;
>>
>> private final String formatString;
>>
>> public S3Bucketer() {
>> }
>>
>> private void readObject(ObjectInputStream in) {
>> in.defaultReadObject();
>> }
>>
>> public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
>> return true;
>> }
>>
>> public Path getNextBucketPath(Path basePath) {
>> return new Path(basePath + “/some-path-that-I-need-create-from-the-stream");
>> }
>> }
>>
>> my question now is how do I access the data stream from within the S3Bucketer so that I can generate a filename based on the data with the data stream.
>>
>> Thanks,
>>
>>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>
>>> In the second link for the BucketingSink, you can set your
>>> own Bucketer using the setBucketer method. You do not have to
>>> implement your own sink from scratch.
>>>
>>> Kostas
>>>
>>>> On Aug 16, 2017, at 1:39 PM, ant burton <ap...@gmail.com> wrote:
>>>>
>>>> or rather https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>>>
>>>>
>>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>>>
>>>>> Hi Ant,
>>>>>
>>>>> I think you can do it by implementing your own Bucketer.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> .
>>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <ap...@gmail.com> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Given
>>>>>>
>>>>>> // Set StreamExecutionEnvironment
>>>>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>> // Set checkpoints in ms
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>>
>>>>>> // Add source (input stream)
>>>>>> DataStream<String> dataStream = StreamUtil.getDataStream(env, params);
>>>>>>
>>>>>> How can I construct the s3_filename from the content of the an event, it seems that whenever I attempt this I either have access to an event or access to .addSink but not both.
>>>>>>
>>>>>> dataStream.addSink(new BucketingSink<String>("s3a://flink/" + s3_filename));
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>