You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sathi Chowdhury <Sa...@elliemae.com> on 2017/03/02 08:44:14 UTC

NPE while writing to s3://

I get the NPE from  the below code
I am running this from my mac in a local flink cluster.


RollingSink<String> s3Sink = new RollingSink<String>("s3://sc-sink1/");
s3Sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
s3Sink.setWriter(new StringWriter<String>());
s3Sink.setBatchSize(200);
s3Sink.setPendingPrefix("file-");
s3Sink.setPendingSuffix(".txt");
outStream.addSink(s3Sink).setParallelism(1);
causes

java.lang.NullPointerException
            at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:463)
            at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:410)
            at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
            at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
            at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
            at java.lang.Thread.run(Thread.java:745)

I even tried changing s3:// path to a local path, same issue
The below works and spits out the stream, and the stream has data.

outStream.writeAsText("/Users/schowdhury/flink/kinesisread"+System.currentTimeMillis());



am I missing something obvious?looks like it is trying to create a folder.




=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Re: NPE while writing to s3://

Posted by Till Rohrmann <tr...@apache.org>.
Hi Sathi,

which version of Flink are you using? Since Flink 1.2 the RollingSink is
deprecated. It is now recommend to use the BucketingSink. Maybe this
problem is resolved with the newer sink.

Cheers,
Till
​

On Thu, Mar 2, 2017 at 9:44 AM, Sathi Chowdhury <
Sathi.Chowdhury@elliemae.com> wrote:

> I get the NPE from  the below code
>
> I am running this from my mac in a local flink cluster.
>
>
>
>
>
> RollingSink<String> s3Sink = *new *RollingSink<String>(*"s3://sc-sink1/"*
> );
> s3Sink.setBucketer(*new *DateTimeBucketer(*"yyyy-MM-dd--HHmm"*));
> s3Sink.setWriter(*new *StringWriter<String>());
> s3Sink.setBatchSize(200);
> s3Sink.setPendingPrefix(*"file-"*);
> s3Sink.setPendingSuffix(*".txt"*);
> outStream.addSink(s3Sink).setParallelism(1);
>
> causes
>
>
>
> java.lang.NullPointerException
>
>             at org.apache.flink.streaming.connectors.fs.RollingSink.
> openNewPartFile(RollingSink.java:463)
>
>             at org.apache.flink.streaming.connectors.fs.RollingSink.
> invoke(RollingSink.java:410)
>
>             at org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:39)
>
>             at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:188)
>
>             at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:67)
>
>             at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:267)
>
>             at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:642)
>
>             at java.lang.Thread.run(Thread.java:745)
>
>
>
> I even tried changing s3:// path to a local path, same issue
>
> The below works and spits out the stream, and the stream has data.
>
> outStream.writeAsText(*"/Users/schowdhury/flink/kinesisread"*+System.*currentTimeMillis*());
>
>
>
> am I missing something obvious?looks like it is trying to create a folder.
>
>
>
>
>
>
> =============Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =============
>