You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Samra Kasim <sa...@thehumangeo.com> on 2017/01/13 16:48:55 UTC

WindowFunction to push data from Kafka to S3

Hi,

I am reading messages off a Kafka Topic and want to process the messages
through Flink and save them into S3. It was pointed out to me that stream
processing of the Kafka data won't be saved to S3 because S3 doesn't allow
data to be appended to a file, so I want to convert the Kafka stream into
batches and save them to S3. Based on other user questions/answers, it
looks like this is possible using windowing by breaking the stream into
batches and creating files. I have written the following code, but it
doesn't work and I am not getting any errors either. I have a sys.out that
shows the tuple is being processed, but it might not be emitted in the
out.collect. Can someone help me figure out what may be the issue? Thanks!

public class S3Sink {

    public static void main(String[] args) throws Exception {

        Map<String, String> configs =
ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");



        final ParameterTool parameterTool = ParameterTool.fromMap(configs);



StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().disableSysoutLogging();

        env.getConfig().setGlobalJobParameters(parameterTool);



        DataStream messageStream = env

                .addSource(new
FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"),

                        new SimpleStringSchema(),

                        parameterTool.getProperties()));



        String uuid = UUID.randomUUID().toString();



        DataStreamSink tuple2DataStream = messageStream

                .flatMap(new Tupler())

                .keyBy(0)

              .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .apply(new MyWindowFunction())

                .writeAsText("s3://flink-test/flink-output-stream/"+ uuid +
"testdoc.txt");


        env.execute();

    }



    private static class Tupler implements FlatMapFunction<String,
Tuple2<String, String>> {

        @Override

        public void flatMap(String record, Collector<Tuple2<String,
String>> out) throws Exception {

        out.collect(new Tuple2<String, String>("record",record));

        }

    }



    private static class MyWindowFunction implements
WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple,
TimeWindow>{



        @Override

        public void apply(Tuple key, TimeWindow timeWindow,
Iterable<Tuple2<String, String>> input,

                          Collector<Tuple2<String, String>> out) throws
Exception {

            for (Tuple2<String, String> in: input){

                System.out.println(in);

                out.collect(in);

            }

        }

    }

}

-- 

Thanks,

Sam

Re: WindowFunction to push data from Kafka to S3

Posted by Ufuk Celebi <uc...@apache.org>.
Can you check the log files of the TaskManagers and JobManager?

There is no obvious reason that the collection should not work.

On another note: the rolling file sink might be what you are looking for.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html

On Fri, Jan 13, 2017 at 5:48 PM, Samra Kasim
<sa...@thehumangeo.com> wrote:
> Hi,
>
> I am reading messages off a Kafka Topic and want to process the messages
> through Flink and save them into S3. It was pointed out to me that stream
> processing of the Kafka data won't be saved to S3 because S3 doesn't allow
> data to be appended to a file, so I want to convert the Kafka stream into
> batches and save them to S3. Based on other user questions/answers, it looks
> like this is possible using windowing by breaking the stream into batches
> and creating files. I have written the following code, but it doesn't work
> and I am not getting any errors either. I have a sys.out that shows the
> tuple is being processed, but it might not be emitted in the out.collect.
> Can someone help me figure out what may be the issue? Thanks!
>
> public class S3Sink {
>
>     public static void main(String[] args) throws Exception {
>
>         Map<String, String> configs =
> ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");
>
>
>
>         final ParameterTool parameterTool = ParameterTool.fromMap(configs);
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.getConfig().disableSysoutLogging();
>
>         env.getConfig().setGlobalJobParameters(parameterTool);
>
>
>
>         DataStream messageStream = env
>
>                 .addSource(new
> FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"),
>
>                         new SimpleStringSchema(),
>
>                         parameterTool.getProperties()));
>
>
>
>         String uuid = UUID.randomUUID().toString();
>
>
>
>         DataStreamSink tuple2DataStream = messageStream
>
>                 .flatMap(new Tupler())
>
>                 .keyBy(0)
>
>               .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>
>                 .apply(new MyWindowFunction())
>
>                 .writeAsText("s3://flink-test/flink-output-stream/"+ uuid +
> "testdoc.txt");
>
>
>         env.execute();
>
>     }
>
>
>
>     private static class Tupler implements FlatMapFunction<String,
> Tuple2<String, String>> {
>
>         @Override
>
>         public void flatMap(String record, Collector<Tuple2<String, String>>
> out) throws Exception {
>
>         out.collect(new Tuple2<String, String>("record",record));
>
>         }
>
>     }
>
>
>
>     private static class MyWindowFunction implements
> WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple,
> TimeWindow>{
>
>
>
>         @Override
>
>         public void apply(Tuple key, TimeWindow timeWindow,
> Iterable<Tuple2<String, String>> input,
>
>                           Collector<Tuple2<String, String>> out) throws
> Exception {
>
>             for (Tuple2<String, String> in: input){
>
>                 System.out.println(in);
>
>                 out.collect(in);
>
>             }
>
>         }
>
>     }
>
> }
>
>
> --
>
> Thanks,
>
> Sam