You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Mike Percy (JIRA)" <ji...@apache.org> on 2013/06/22 03:45:20 UTC

[jira] [Resolved] (FLUME-2096) HDFS Not appending to a file, continuously rollng file.

     [ https://issues.apache.org/jira/browse/FLUME-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mike Percy resolved FLUME-2096.
-------------------------------

    Resolution: Not A Problem
    
> HDFS Not appending to a file, continuously rollng file.
> -------------------------------------------------------
>
>                 Key: FLUME-2096
>                 URL: https://issues.apache.org/jira/browse/FLUME-2096
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.3.0
>         Environment: Ubuntu 12.04 - 
> Flume 1.3.0-cdh4.3.0
> Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
> Revision: 6acdb16f9f70cb6537a70e5070985901ea4ad449
> Compiled by jenkins on Mon May 27 21:10:41 PDT 2013
> From source with checksum b71131b08ef7aea7a7930fbec9880881
>            Reporter: Josh Myers
>              Labels: hdfs, rollInterval
>
> Hi guys,
> We are sending JSON events from our pipeline into a flume http source. 
> We have written a custom multiplexer and sink serializer. The events are 
> being routed into the correct channels and consumed OK by the sinks. The 
> custom serializer takes a JSON event and outputs a csv. Files are being 
> written to s3 ( using s3n as hdfs ) but rather than appending the 
> written csv file, each event seems to be generating it own csv. 
> The output is what I would expect using rollCount 1, however we do 
> occasionally get several events ( maybe 4 ) written per csv. Please see 
> below for config.
> Ideally we want to use rollInterval of 24 hours, to generate a new .csv 
> file every 24 hours, but have events pretty quickly flushed to the csv 
> file after being sent. So one csv' per day that is consistently appended 
> with whatever events we throw in. We found however that with a 
> rollInterval of 24 hours the events weren't being flushed often enough...
> Is this a bug??
> FYI - We have ammended the below config to include hdfs.rollCount = 0, hdfs.rollSize = 0 so those shouldn't be falling back to defaults and we still have the same issue...
> Thanks.
> Josh
> ## Sources ###################################################
> agent.sources = http
> agent.sources.http.type = http
> agent.sources.http.bind = 0.0.0.0
> agent.sources.http.port = 4444
> agent.sources.http.channels = cappucino_s3_aggregate_profile_channel 
> default_s3_channel cappucino_s3_trip_summary_channel
> ## Interceptors #################################################
> agent.sources.http.interceptors = itime ihost
> agent.sources.http.interceptors.itime.type = timestamp
> agent.sources.http.interceptors.ihost.type = host
> agent.sources.http.interceptors.ihost.useIP = false
> agent.sources.http.interceptors.ihost.preserveExisting= false
> agent.sources.http.interceptors.ihost.hostHeader = hostname
> ## Multiplex Channels Mapping ######################################
> agent.sources.http.selector.type = 
> com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector
> agent.sources.http.selector.default = default_s3_channel
> ## Channels ########################################################
> agent.channels = cappucino_s3_aggregate_profile_channel 
> cappucino_s3_trip_summary_channel default_s3_channel
> agent.channels.cappucino_s3_aggregate_profile_channel.type = file
> agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000
> agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir = 
> /mnt/flume/cappucino_s3_aggregate_profile/checkpoint
> agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs = 
> /mnt/flume/cappucino_s3_aggregate_profile/data
> agent.channels.cappucino_s3_trip_summary_channel.type = file
> agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000
> agent.channels.cappucino_s3_trip_summary_channel.checkpointDir = 
> /mnt/flume/cappucino_s3_trip_summary/checkpoint
> agent.channels.cappucino_s3_trip_summary_channel.dataDirs = 
> /mnt/flume/cappucino_s3_trip_summary/data
> ## Sinks ###########################################################
> agent.sinks = cappucino_s3_aggregate_profile_sink1 
> cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1 
> cappucino_s3_trip_summary_sink2
> ## Serialize json events from the pipeline and write csv to HDFS (We are 
> using s3 native FS as HDFS)
> ###############################################################################
> ## Capuccino_s3_aggregate_profile Sinks 
> #################################################
> agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs
> agent.sinks.cappucino_s3_aggregate_profile_sink1.channel = 
> cappucino_s3_aggregate_profile_channel
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path = 
> s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer = 
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns = 
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format = 
> DriverProfile
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = ,
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline = 
> false
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure 
> = MILES
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix = 
> DriverProfile.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC
> agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs
> agent.sinks.cappucino_s3_aggregate_profile_sink2.channel = 
> cappucino_s3_aggregate_profile_channel
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path = 
> s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer = 
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns = 
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format = 
> DriverProfile
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = ,
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline = 
> false
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure 
> = MILES
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix = 
> DriverProfile.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC
> ## Cappuccino_s3_trip_summary Sinks 
> #################################################
> agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs
> agent.sinks.cappucino_s3_trip_summary_sink1.channel = 
> cappucino_s3_trip_summary_channel
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path = 
> s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer = 
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns = 
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = ,
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure = 
> MILES
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix = 
> TripSummary.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC
> agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs
> agent.sinks.cappucino_s3_trip_summary_sink2.channel = 
> cappucino_s3_trip_summary_channel
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path = 
> s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer = 
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns = 
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = ,
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure = 
> MILES
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC
> ## SinkGroups ###########################################################
> agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup 
> cappucino_s3_trip_summary_sinkgroup
> ## Cappuccino_s3_aggregate_profile Failover SinkGroup 
> ##########################################
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks = 
> cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type 
> = failover
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000
> ## Confused_s3_trip_summary Failover SinkGroup 
> ##########################################
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks = 
> cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type = 
> failover
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1
> = 10
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2
> = 5
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty = 
> 30000

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira