You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2018/10/01 08:52:18 UTC

Re: Streaming to Parquet Files in HDFS

Hi Bill,

Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the
previously mentioned StreamingFileSink [1], [2].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9753
[2] https://issues.apache.org/jira/browse/FLINK-9750

Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao <ha...@gmail.com>:

> Hi Bill,
>
> I wrote those two medium posts you mentioned above. But clearly, the
> techlab one is much better
> I would suggest just "close the file when checkpointing" which is the
> easiest way. If you use BucketingSink, you can modify the code to make it
> work. Just replace the code from line 691 to 693 with
> closeCurrentPartFile()
>
> https://github.com/apache/flink/blob/release-1.3.2-rc1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L691
> This should guarantee exactly-once. You may have some files with
> underscore prefix when flink job failed. But usually those files are
> ignored by the query engine/ readers for example,  Presto
>
> If you use 1.6 and later, I think the issue is already addressed
> https://issues.apache.org/jira/browse/FLINK-9750
>
> Thanks
> Hao
>
> On Fri, Sep 28, 2018 at 1:57 PM William Speirs <ws...@apache.org> wrote:
>
>> I'm trying to stream log messages (syslog fed into Kafak) into Parquet
>> files on HDFS via Flink. I'm able to read, parse, and construct objects for
>> my messages in Flink; however, writing to Parquet is tripping me up. I do
>> *not* need to have this be real-time; a delay of a few minutes, even up to
>> an hour, is fine.
>>
>> I've found the following articles talking about this being very difficult:
>> *
>> https://medium.com/hadoop-noob/a-realtime-flink-parquet-data-warehouse-df8c3bd7401
>> * https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
>> *
>> https://techlab.bol.com/how-not-to-sink-a-data-stream-to-files-journeys-from-kafka-to-parquet/
>> *
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-td11123.html
>>
>> All of these posts speak of troubles using the check-pointing mechanisms
>> and Parquets need to perform batch writes. I'm not experienced enough with
>> Flink's check-pointing or Parquet's file format to completely understand
>> the issue. So my questions are as follows:
>>
>> 1) Is this possible in Flink in an exactly-once way? If not, is it
>> possible in a way that _might_ cause duplicates during an error?
>>
>> 2) Is there another/better format to use other than Parquet that offers
>> compression and the ability to be queried by something like Drill or Impala?
>>
>> 3) Any further recommendations for solving the overall problem: ingesting
>> syslogs and writing them to a file(s) that is searchable by an SQL(-like)
>> framework?
>>
>> Thanks!
>>
>> Bill-
>>
>
>
> --
> Thanks
>  - Hao
>

Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

Thanks for the info. That error caused by I built your code along with not
up-to-date baseline. I rebased my branch build, and there's no more such
issue.
I've been testing, and until now have some questions/issues as below:

1. I'm not able to write to S3 with the following URI format: *s3*://<path>,
and had to use *s3a*://<path>. Is this behaviour expected? (I am running
Flink on AWS EMR, and I thought that EMR provides a wrapper for HDFS over S3
with something called EMRFS).

2. Occasionally/randomly I got the below message ( parquet_error1.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error1.log> 
). I'm using ParquetAvroWriters.forReflectRecord() method to write Scala
case classes. Re-running the job doesn't get that error at the same data
location, so I don't think that there's issue with data.
 *java.lang.ArrayIndexOutOfBoundsException: <some random number>* /at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.fallBackDictionaryEncodedData/. 

3. Sometimes I got this error message when I use parallelism of 8 for the
sink ( parquet_error2.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error2.log> 
).
Reducing to 2 solves the issue. But is it possible to increase the pool
size? I could not find any place that I can change the
/fs.s3.maxconnections/ parameter.
/java.io.InterruptedIOException: initiate MultiPartUpload on
Test/output/dt=2018-09-20/part-7-5:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool/

4. Where is the temporary folder that you store the parquet file before
uploading to S3?

Thanks a lot for your help.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi,

Yes, please enable DEBUG to streaming to see all the 
logs also from the StreamTask.

A checkpoint is “valid” as soon as it get acknowledged.
As the documentation says, the job will restart from
“ the last **successful** checkpoint” which is the most 
recent acknowledged one.

Cheers,
Kostas  

> On Oct 7, 2018, at 1:03 PM, Averell <lv...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Yes, I set the level to DEBUG, but for the
> /org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only.
> Will try to enable for /org.apache.flink.streaming/.
> I just found one (possibly) issue with my build is that I had not used the
> latest master branch when merging with your PR. So I might have missed some
> other important PRs.
> 
> BTW, regarding your comment /Checkpoints and acknowledgments are not
> necessarily aligned/ << If this happens and for some reason, the job crashed
> right after the checkpoint and before acknowledgements, does that means that
> last checkpoint is not valid?
> 
> Thanks and best regards,
> Averell
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

Yes, I set the level to DEBUG, but for the
/org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only.
Will try to enable for /org.apache.flink.streaming/.
I just found one (possibly) issue with my build is that I had not used the
latest master branch when merging with your PR. So I might have missed some
other important PRs.

BTW, regarding your comment /Checkpoints and acknowledgments are not
necessarily aligned/ << If this happens and for some reason, the job crashed
right after the checkpoint and before acknowledgements, does that means that
last checkpoint is not valid?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi,

I just saw that you have already set the level to DEBUG.

These are all your DEBUG logs of the TM when running on YARN?

Also did you try to wait a bit more to see if the acknowledgements of the checkpoints
arrive a bit later? Checkpoints and acknowledgments are not necessarily aligned.

Kostas

> On Oct 7, 2018, at 12:37 PM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Averell,
> 
> Could you set your logging to DEBUG?
> This may shed some light on what is happening as it will contain more logs.
> 
> Kostas
> 
>> On Oct 7, 2018, at 11:03 AM, Averell <lv...@gmail.com> wrote:
>> 
>> Hi Kostas,
>> 
>> I'm using a build with your PR. However, it seemed the issue is not with S3,
>> as when I tried to write to local file system (file:///, not HDFS), I also
>> got the same problem - only the first part published. All remaining parts
>> were in inprogress and had names prefixed with "."
>> 
>> From Flink GUI, all checkpoints were shown as completed successfully. 
>> 
>> How could I debug further?
>> 
>> Thanks a lot for your help.
>> Regards,
>> Averell
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 


Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

Could you set your logging to DEBUG?
This may shed some light on what is happening as it will contain more logs.

Kostas

> On Oct 7, 2018, at 11:03 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> I'm using a build with your PR. However, it seemed the issue is not with S3,
> as when I tried to write to local file system (file:///, not HDFS), I also
> got the same problem - only the first part published. All remaining parts
> were in inprogress and had names prefixed with "."
> 
> From Flink GUI, all checkpoints were shown as completed successfully. 
> 
> How could I debug further?
> 
> Thanks a lot for your help.
> Regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

I'm using a build with your PR. However, it seemed the issue is not with S3,
as when I tried to write to local file system (file:///, not HDFS), I also
got the same problem - only the first part published. All remaining parts
were in inprogress and had names prefixed with "."

From Flink GUI, all checkpoints were shown as completed successfully. 

How could I debug further?

Thanks a lot for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

From the logs, only checkpoint 2 was acknowledged (search for 
“eceived completion notification for checkpoint with id=“) and this is 
why no more files are finalized. So only checkpoint 2 was successfully 
completed.

BTW you are using the PR you mentioned before or Flink 1.6?
I am asking because on Flink 1.6. there is no support for S3 on the 
streaming file sink.

Cheers,
Kostas

> On Oct 7, 2018, at 2:02 AM, Averell <lv...@gmail.com> wrote:
> 
> received completion notification for checkpoint wit


Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

Please help ignore my previous email about the issue with security. It seems
to I had mixed version of shaded and unshaded jars.

However, I'm now facing another issue with writing parquet files: only the
first part is closed. All the subsequent parts are kept in in-progress state
forever. My settings are to have checkpoint every 3 minutes. Sink
parallelism set to 1 (my tries to set to 4 or 30 showed no difference).
BucketID assigner is using event-timestamp.
I only got this issue when running Flink on a yarn cluster, either writing
to file:/// or to S3. When I ran it on my laptop, I got one part for every
single checkpoint.
TM logs says something like "*BucketState ... has pending files for
checkpoints: {2 }*"

Could you please help on how can I further debug this?

Here below is the TM log:

2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 }
2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 }
2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10).
2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10).
2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-2" for bucket id=dt=2018-09-22.
2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-2" for bucket id=dt=2018-09-22.
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=4 (max part counter=3).
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=4 (max part counter=3).
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 }
2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 }
2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25).
2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25).
2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-3" for bucket id=dt=2018-09-22.
2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-3" for bucket id=dt=2018-09-22.
2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=5 (max part counter=4).
2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=5 (max part counter=4).
2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=6 (max part counter=4).
2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=6 (max part counter=4).
2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

I tried your PR - trying to write to S3 from Flink running on AWS, and I got
the following error. I copied the three jar files
flink-hadoop-fs-1.7-SNAPSHOT.jar, flink-s3-fs-base-1.7-SNAPSHOT.jar,
flink-s3-fs-hadoop-1.7-SNAPSHOT.jar to lib/ directory. Do I need to make any
change to HADOOP configurations?

Thanks and best regards,
Averell

java.lang.Exception: unable to establish the security context
	at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1118)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
	at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
	at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
	at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
	at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
	at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
	... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
What a great news.
Thanks for that, Kostas.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

There is no such “out-of-the-box” solution, but there is an open PR for adding 
S3 support to the StreamingFileSink [1].

Cheers,
Kostas

[1] https://github.com/apache/flink/pull/6795 <https://github.com/apache/flink/pull/6795>

> On Oct 5, 2018, at 11:14 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Thanks for the info.
> Just one more question regarding writing parquet. I need to write my stream
> as parquet to S3. As per this ticket 
> https://issues.apache.org/jira/browse/FLINK-9752
> <https://issues.apache.org/jira/browse/FLINK-9752>  , it is now not
> supported. Is there any ready-to-use solution that supports copying/moving
> file from HDFS to S3 (something like a trigger from Flink after it has
> finished writing to HDFS).
> 
> Thanks and best regards,
> Averell 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Kostas,

Thanks for the info.
Just one more question regarding writing parquet. I need to write my stream
as parquet to S3. As per this ticket 
https://issues.apache.org/jira/browse/FLINK-9752
<https://issues.apache.org/jira/browse/FLINK-9752>  , it is now not
supported. Is there any ready-to-use solution that supports copying/moving
file from HDFS to S3 (something like a trigger from Flink after it has
finished writing to HDFS).

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Averell,

You are right that for Bulk Formats like Parquet, we roll on every checkpoint.

This is currently a limitation that has to do with the fact that bulk formats gather 
and rely on metadata that they keep internally and which we cannot checkpoint
in Flink,as they do not expose them.

Setting the checkpoint interval affects how big your part files are going to be and,
in some cases, how efficient your compression is going to be. In some cases, the 
more the data to be compressed, the better to compression ratio. 

Exposing the withBucketCheckInterval() you are right that it does not 
serve much for the moment.

Cheers,
Kostas

> On Oct 5, 2018, at 1:54 AM, Averell <lv...@gmail.com> wrote:
> 
> Hi Fabian, Kostas,
> 
> From the description of this ticket
> https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
> output parquet file with StreamingFileSink will span multiple checkpoints.
> However, when I tried (as in the here below code snippet) I still see that
> one "part-X-X" file is created after each checkpoint. Is there any other
> configuration that I'm missing?
> 
> BTW, I have another question regarding
> StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
> notes at the end of this page  StreamingFileSink
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html> 
> , buck-enconding can only combined with OnCheckpointRollingPolicy, which
> rolls on every checkpoint. So setting that CheckInterval makes no
> difference. So why should we expose that withBucketCheckInterval method?
> 
> Thanks and best regards,
> Averell
> 
> 	def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
> ClassTag[T]): StreamingFileSink[T] = {
> 		StreamingFileSink.forBulkFormat(new Path(outputPath),
> ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
> String]]
> 				.withBucketCheckInterval(5L * 60L * 1000L)
> 				.withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH"))
> 				.build()
> 	}
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming to Parquet Files in HDFS

Posted by Averell <lv...@gmail.com>.
Hi Fabian, Kostas,

From the description of this ticket
https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
output parquet file with StreamingFileSink will span multiple checkpoints.
However, when I tried (as in the here below code snippet) I still see that
one "part-X-X" file is created after each checkpoint. Is there any other
configuration that I'm missing?

BTW, I have another question regarding
StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
notes at the end of this page  StreamingFileSink
<https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html> 
, buck-enconding can only combined with OnCheckpointRollingPolicy, which
rolls on every checkpoint. So setting that CheckInterval makes no
difference. So why should we expose that withBucketCheckInterval method?

Thanks and best regards,
Averell

	def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
ClassTag[T]): StreamingFileSink[T] = {
		StreamingFileSink.forBulkFormat(new Path(outputPath),
ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
String]]
				.withBucketCheckInterval(5L * 60L * 1000L)
				.withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH"))
				.build()
	}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Streaming to Parquet Files in HDFS

Posted by Biswajit Das <bi...@gmail.com>.
Nice to see this finally!

On Mon, Oct 1, 2018 at 1:53 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Bill,
>
> Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the
> previously mentioned StreamingFileSink [1], [2].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9753
> [2] https://issues.apache.org/jira/browse/FLINK-9750
>
> Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao <ha...@gmail.com>:
>
>> Hi Bill,
>>
>> I wrote those two medium posts you mentioned above. But clearly, the
>> techlab one is much better
>> I would suggest just "close the file when checkpointing" which is the
>> easiest way. If you use BucketingSink, you can modify the code to make it
>> work. Just replace the code from line 691 to 693 with
>> closeCurrentPartFile()
>>
>> https://github.com/apache/flink/blob/release-1.3.2-rc1/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L691
>> This should guarantee exactly-once. You may have some files with
>> underscore prefix when flink job failed. But usually those files are
>> ignored by the query engine/ readers for example,  Presto
>>
>> If you use 1.6 and later, I think the issue is already addressed
>> https://issues.apache.org/jira/browse/FLINK-9750
>>
>> Thanks
>> Hao
>>
>> On Fri, Sep 28, 2018 at 1:57 PM William Speirs <ws...@apache.org>
>> wrote:
>>
>>> I'm trying to stream log messages (syslog fed into Kafak) into Parquet
>>> files on HDFS via Flink. I'm able to read, parse, and construct objects for
>>> my messages in Flink; however, writing to Parquet is tripping me up. I do
>>> *not* need to have this be real-time; a delay of a few minutes, even up to
>>> an hour, is fine.
>>>
>>> I've found the following articles talking about this being very
>>> difficult:
>>> *
>>> https://medium.com/hadoop-noob/a-realtime-flink-parquet-data-warehouse-df8c3bd7401
>>> * https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
>>> *
>>> https://techlab.bol.com/how-not-to-sink-a-data-stream-to-files-journeys-from-kafka-to-parquet/
>>> *
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-td11123.html
>>>
>>> All of these posts speak of troubles using the check-pointing mechanisms
>>> and Parquets need to perform batch writes. I'm not experienced enough with
>>> Flink's check-pointing or Parquet's file format to completely understand
>>> the issue. So my questions are as follows:
>>>
>>> 1) Is this possible in Flink in an exactly-once way? If not, is it
>>> possible in a way that _might_ cause duplicates during an error?
>>>
>>> 2) Is there another/better format to use other than Parquet that offers
>>> compression and the ability to be queried by something like Drill or Impala?
>>>
>>> 3) Any further recommendations for solving the overall problem:
>>> ingesting syslogs and writing them to a file(s) that is searchable by an
>>> SQL(-like) framework?
>>>
>>> Thanks!
>>>
>>> Bill-
>>>
>>
>>
>> --
>> Thanks
>>  - Hao
>>
>