You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Scott Handerson <sj...@mailbox.org> on 2017/03/07 08:59:45 UTC

Write transformed data to file_roller sink with Morphline

I attempt to use integrate morphline with flume. And I am able to use logger sink to print data to the console. However, when switching to use file_roller sink. The output folder only contains new line (empty line). How can I get transformed data written to the file_roller sink?


Thanks.


Flume agent conf file:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /path/to/source
a1.sources.r1.interceptors = m
a1.sources.r1.interceptors.m.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.r1.interceptors.m.morphlineFile = /path/to/conf/morphline.conf
a1.sources.r1.interceptors.m.morphlineId = morphline1

a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /path/to/output
a1.sinks.k1.sink.rollInterval = 3


a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


Morphline conf file:

morphlines : [{
id : morphline1
importCommands : ["org.kitesdk.**" ]
commands : [
{
readCSV {
separator : "\t"
columns : [userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second]
ignoreFirstLine : false
trim : true
charset : UTF-8
}
}
{
java {
imports : "import java.util.*;"
code: """
record.removeAll("date_hour");
record.removeAll("date_minute");
record.removeAll("date_second");
return child.process(record);
"""
}
}
{ logInfo { format : "(after columns are removed) record: {}", args : ["@{}"] } }
{
setValues {
_attachment_body : "@{bodybak}"
bodybak : []
_attachment_mimetype : []
}
}
]
}]

Re: Write transformed data to file_roller sink with Morphline

Posted by Denes Arvay <de...@cloudera.com>.
Hi Scott,

I think it's not because of the morphlines. If the data appears properly on
loggersink then it should be the same with any other sinks.
I did some debugging on the file roll sink and apparently it rotates empty
files as well: after the configured rollInterval it marks the current file
to be rotated and when it tries to process the next event the file will be
rotated even if there is nothing to write into it.

If you are interested in the details on code level:
- here is the file rotation logic:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java#L147
-
this kick in even if there is no new event in the channel
- this method (the process()) is called (through multiple levels) by the
SinkRunner which sleeps for maximum 5 seconds if the sink returned BACKOFF
(ie. there was no event to process):
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java#L148-L148
- the result is that you will have new (and apparently empty files) in
every ~5 seconds

IMHO this is a bug in the file roller sink, I'm going to file a JIRA ticket
to get this fixed.

Btw, the 3 seconds rollInterval seems a bit low for me (the default is 30
seconds), is it intentional? If yes, what do you want to achieve with it?

Kind regards,
Denes

On Tue, Mar 7, 2017 at 9:59 AM Scott Handerson <sj...@mailbox.org>
wrote:

> I attempt to use integrate morphline with flume. And I am able to use
> logger sink to print data to the console. However, when switching to use
> file_roller sink. The output folder only contains new line (empty line).
> How can I get transformed data written to the file_roller sink?
>
>
> Thanks.
>
>
> Flume agent conf file:
>
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
>
> a1.sources.r1.type = spooldir
> a1.sources.r1.spoolDir = /path/to/source
> a1.sources.r1.interceptors = m
> a1.sources.r1.interceptors.m.type =
> org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
> a1.sources.r1.interceptors.m.morphlineFile = /path/to/conf/morphline.conf
> a1.sources.r1.interceptors.m.morphlineId = morphline1
>
> a1.sinks.k1.type = file_roll
> a1.sinks.k1.sink.directory = /path/to/output
> a1.sinks.k1.sink.rollInterval = 3
>
>
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 100000
>
>
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
>
>
> Morphline conf file:
>
> morphlines : [{
> id : morphline1
> importCommands : ["org.kitesdk.**" ]
> commands : [
> {
> readCSV {
> separator : "\t"
> columns :
> [userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second]
> ignoreFirstLine : false
> trim : true
> charset : UTF-8
> }
> }
> {
> java {
> imports : "import java.util.*;"
> code: """
> record.removeAll("date_hour");
> record.removeAll("date_minute");
> record.removeAll("date_second");
> return child.process(record);
> """
> }
> }
> { logInfo { format : "(after columns are removed) record: {}", args :
> ["@{}"] } }
> {
> setValues {
> _attachment_body : "@{bodybak}"
> bodybak : []
> _attachment_mimetype : []
> }
> }
> ]
> }]
>