You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Josh Myers (JIRA)" <ji...@apache.org> on 2013/06/19 15:50:19 UTC

[jira] [Created] (FLUME-2096) HDFS Not appending to a file, continuously rollng file.

Josh Myers created FLUME-2096:
---------------------------------

             Summary: HDFS Not appending to a file, continuously rollng file.
                 Key: FLUME-2096
                 URL: https://issues.apache.org/jira/browse/FLUME-2096
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.3.0
         Environment: Ubuntu 12.04 - 
Flume 1.3.0-cdh4.3.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 6acdb16f9f70cb6537a70e5070985901ea4ad449
Compiled by jenkins on Mon May 27 21:10:41 PDT 2013
>From source with checksum b71131b08ef7aea7a7930fbec9880881
            Reporter: Josh Myers


Hi guys,

We are sending JSON events from our pipeline into a flume http source. 
We have written a custom multiplexer and sink serializer. The events are 
being routed into the correct channels and consumed OK by the sinks. The 
custom serializer takes a JSON event and outputs a csv. Files are being 
written to s3 ( using s3n as hdfs ) but rather than appending the 
written csv file, each event seems to be generating it own csv. 

The output is what I would expect using rollCount 1, however we do 
occasionally get several events ( maybe 4 ) written per csv. Please see 
below for config.

Ideally we want to use rollInterval of 24 hours, to generate a new .csv 
file every 24 hours, but have events pretty quickly flushed to the csv 
file after being sent. So one csv' per day that is consistently appended 
with whatever events we throw in. We found however that with a 
rollInterval of 24 hours the events weren't being flushed often enough...

Is this a bug??

FYI - We have ammended the below config to include hdfs.rollCount = 0, hdfs.rollSize = 0 so those shouldn't be falling back to defaults and we still have the same issue...

Thanks.


Josh


## Sources ###################################################
agent.sources = http
agent.sources.http.type = http
agent.sources.http.bind = 0.0.0.0
agent.sources.http.port = 4444
agent.sources.http.channels = cappucino_s3_aggregate_profile_channel 
default_s3_channel cappucino_s3_trip_summary_channel

## Interceptors #################################################
agent.sources.http.interceptors = itime ihost
agent.sources.http.interceptors.itime.type = timestamp
agent.sources.http.interceptors.ihost.type = host
agent.sources.http.interceptors.ihost.useIP = false
agent.sources.http.interceptors.ihost.preserveExisting= false
agent.sources.http.interceptors.ihost.hostHeader = hostname

## Multiplex Channels Mapping ######################################
agent.sources.http.selector.type = 
com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector
agent.sources.http.selector.default = default_s3_channel

## Channels ########################################################
agent.channels = cappucino_s3_aggregate_profile_channel 
cappucino_s3_trip_summary_channel default_s3_channel

agent.channels.cappucino_s3_aggregate_profile_channel.type = file
agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000
agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir = 
/mnt/flume/cappucino_s3_aggregate_profile/checkpoint
agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs = 
/mnt/flume/cappucino_s3_aggregate_profile/data

agent.channels.cappucino_s3_trip_summary_channel.type = file
agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000
agent.channels.cappucino_s3_trip_summary_channel.checkpointDir = 
/mnt/flume/cappucino_s3_trip_summary/checkpoint
agent.channels.cappucino_s3_trip_summary_channel.dataDirs = 
/mnt/flume/cappucino_s3_trip_summary/data

## Sinks ###########################################################
agent.sinks = cappucino_s3_aggregate_profile_sink1 
cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1 
cappucino_s3_trip_summary_sink2

## Serialize json events from the pipeline and write csv to HDFS (We are 
using s3 native FS as HDFS)
###############################################################################
## Capuccino_s3_aggregate_profile Sinks 
#################################################
agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs
agent.sinks.cappucino_s3_aggregate_profile_sink1.channel = 
cappucino_s3_aggregate_profile_channel
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path = 
s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer = 
com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns = 
log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format = 
DriverProfile
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = ,
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline = 
false
agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure 
= MILES
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix = 
DriverProfile.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC

agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs
agent.sinks.cappucino_s3_aggregate_profile_sink2.channel = 
cappucino_s3_aggregate_profile_channel
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path = 
s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer = 
com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns = 
log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format = 
DriverProfile
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = ,
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline = 
false
agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure 
= MILES
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix = 
DriverProfile.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC


## Cappuccino_s3_trip_summary Sinks 
#################################################
agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs
agent.sinks.cappucino_s3_trip_summary_sink1.channel = 
cappucino_s3_trip_summary_channel
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path = 
s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_trip_summary_sink1.serializer = 
com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns = 
log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = ,
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false
agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure = 
MILES
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix = 
TripSummary.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC

agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs
agent.sinks.cappucino_s3_trip_summary_sink2.channel = 
cappucino_s3_trip_summary_channel
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path = 
s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text
agent.sinks.cappucino_s3_trip_summary_sink2.serializer = 
com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns = 
log_type reporting_bucket subscription_id
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = ,
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false
agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure = 
MILES
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC

## SinkGroups ###########################################################
agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup 
cappucino_s3_trip_summary_sinkgroup

## Cappuccino_s3_aggregate_profile Failover SinkGroup 
##########################################
agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks = 
cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2
agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type 
= failover
agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10
agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5
agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000

## Confused_s3_trip_summary Failover SinkGroup 
##########################################
agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks = 
cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2
agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type = 
failover
agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1

= 10
agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2

= 5
agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty = 
30000


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira