You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Mike Percy (JIRA)" <ji...@apache.org> on 2013/06/22 03:45:20 UTC
[jira] [Resolved] (FLUME-2096) HDFS Not appending to a file,
continuously rollng file.
[ https://issues.apache.org/jira/browse/FLUME-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mike Percy resolved FLUME-2096.
-------------------------------
Resolution: Not A Problem
> HDFS Not appending to a file, continuously rollng file.
> -------------------------------------------------------
>
> Key: FLUME-2096
> URL: https://issues.apache.org/jira/browse/FLUME-2096
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.3.0
> Environment: Ubuntu 12.04 -
> Flume 1.3.0-cdh4.3.0
> Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
> Revision: 6acdb16f9f70cb6537a70e5070985901ea4ad449
> Compiled by jenkins on Mon May 27 21:10:41 PDT 2013
> From source with checksum b71131b08ef7aea7a7930fbec9880881
> Reporter: Josh Myers
> Labels: hdfs, rollInterval
>
> Hi guys,
> We are sending JSON events from our pipeline into a flume http source.
> We have written a custom multiplexer and sink serializer. The events are
> being routed into the correct channels and consumed OK by the sinks. The
> custom serializer takes a JSON event and outputs a csv. Files are being
> written to s3 ( using s3n as hdfs ) but rather than appending the
> written csv file, each event seems to be generating it own csv.
> The output is what I would expect using rollCount 1, however we do
> occasionally get several events ( maybe 4 ) written per csv. Please see
> below for config.
> Ideally we want to use rollInterval of 24 hours, to generate a new .csv
> file every 24 hours, but have events pretty quickly flushed to the csv
> file after being sent. So one csv' per day that is consistently appended
> with whatever events we throw in. We found however that with a
> rollInterval of 24 hours the events weren't being flushed often enough...
> Is this a bug??
> FYI - We have ammended the below config to include hdfs.rollCount = 0, hdfs.rollSize = 0 so those shouldn't be falling back to defaults and we still have the same issue...
> Thanks.
> Josh
> ## Sources ###################################################
> agent.sources = http
> agent.sources.http.type = http
> agent.sources.http.bind = 0.0.0.0
> agent.sources.http.port = 4444
> agent.sources.http.channels = cappucino_s3_aggregate_profile_channel
> default_s3_channel cappucino_s3_trip_summary_channel
> ## Interceptors #################################################
> agent.sources.http.interceptors = itime ihost
> agent.sources.http.interceptors.itime.type = timestamp
> agent.sources.http.interceptors.ihost.type = host
> agent.sources.http.interceptors.ihost.useIP = false
> agent.sources.http.interceptors.ihost.preserveExisting= false
> agent.sources.http.interceptors.ihost.hostHeader = hostname
> ## Multiplex Channels Mapping ######################################
> agent.sources.http.selector.type =
> com.mydrivesolutions.flume.serialization.PipelineEventsChannelSelector
> agent.sources.http.selector.default = default_s3_channel
> ## Channels ########################################################
> agent.channels = cappucino_s3_aggregate_profile_channel
> cappucino_s3_trip_summary_channel default_s3_channel
> agent.channels.cappucino_s3_aggregate_profile_channel.type = file
> agent.channels.cappucino_s3_aggregate_profile_channel.capacity = 10000000
> agent.channels.cappucino_s3_aggregate_profile_channel.checkpointDir =
> /mnt/flume/cappucino_s3_aggregate_profile/checkpoint
> agent.channels.cappucino_s3_aggregate_profile_channel.dataDirs =
> /mnt/flume/cappucino_s3_aggregate_profile/data
> agent.channels.cappucino_s3_trip_summary_channel.type = file
> agent.channels.cappucino_s3_trip_summary_channel.capacity = 10000000
> agent.channels.cappucino_s3_trip_summary_channel.checkpointDir =
> /mnt/flume/cappucino_s3_trip_summary/checkpoint
> agent.channels.cappucino_s3_trip_summary_channel.dataDirs =
> /mnt/flume/cappucino_s3_trip_summary/data
> ## Sinks ###########################################################
> agent.sinks = cappucino_s3_aggregate_profile_sink1
> cappucino_s3_aggregate_profile_sink2 cappucino_s3_trip_summary_sink1
> cappucino_s3_trip_summary_sink2
> ## Serialize json events from the pipeline and write csv to HDFS (We are
> using s3 native FS as HDFS)
> ###############################################################################
> ## Capuccino_s3_aggregate_profile Sinks
> #################################################
> agent.sinks.cappucino_s3_aggregate_profile_sink1.type = hdfs
> agent.sinks.cappucino_s3_aggregate_profile_sink1.channel =
> cappucino_s3_aggregate_profile_channel
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.path =
> s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer =
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.columns =
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.format =
> DriverProfile
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.delimiter = ,
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.appendNewline =
> false
> agent.sinks.cappucino_s3_aggregate_profile_sink1.serializer.distanceMeasure
> = MILES
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.filePrefix =
> DriverProfile.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_aggregate_profile_sink1.hdfs.timeZone = UTC
> agent.sinks.cappucino_s3_aggregate_profile_sink2.type = hdfs
> agent.sinks.cappucino_s3_aggregate_profile_sink2.channel =
> cappucino_s3_aggregate_profile_channel
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.path =
> s3n://mydrive-cappucino-reports/driver-profiles/%Y-%m-%d
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer =
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.columns =
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.format =
> DriverProfile
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.delimiter = ,
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.appendNewline =
> false
> agent.sinks.cappucino_s3_aggregate_profile_sink2.serializer.distanceMeasure
> = MILES
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.filePrefix =
> DriverProfile.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_aggregate_profile_sink2.hdfs.timeZone = UTC
> ## Cappuccino_s3_trip_summary Sinks
> #################################################
> agent.sinks.cappucino_s3_trip_summary_sink1.type = hdfs
> agent.sinks.cappucino_s3_trip_summary_sink1.channel =
> cappucino_s3_trip_summary_channel
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.path =
> s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer =
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.columns =
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.format = BodyCSV
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.delimiter = ,
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.appendNewline = false
> agent.sinks.cappucino_s3_trip_summary_sink1.serializer.distanceMeasure =
> MILES
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.filePrefix =
> TripSummary.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_trip_summary_sink1.hdfs.timeZone = UTC
> agent.sinks.cappucino_s3_trip_summary_sink2.type = hdfs
> agent.sinks.cappucino_s3_trip_summary_sink2.channel =
> cappucino_s3_trip_summary_channel
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.path =
> s3n://mydrive-cappucino-reports/trip-summaries/%Y-%m-%d
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileType = DataStream
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.writeFormat = Text
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer =
> com.mydrivesolutions.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.columns =
> log_type reporting_bucket subscription_id
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.format = BodyCSV
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.delimiter = ,
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.appendNewline = false
> agent.sinks.cappucino_s3_trip_summary_sink2.serializer.distanceMeasure =
> MILES
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.maxOpenFiles = 5000
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.rollInterval = 20400
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.callTimeout = 60000
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.fileSuffix = .csv
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.inUseSuffix = .tmp
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.filePrefix = TripSummary.%y-%m-%d.%H.%M
> agent.sinks.cappucino_s3_trip_summary_sink2.hdfs.timeZone = UTC
> ## SinkGroups ###########################################################
> agent.sinkgroups = cappucino_s3_aggregate_profile_sinkgroup
> cappucino_s3_trip_summary_sinkgroup
> ## Cappuccino_s3_aggregate_profile Failover SinkGroup
> ##########################################
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.sinks =
> cappucino_s3_aggregate_profile_sink1 cappucino_s3_aggregate_profile_sink2
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.type
> = failover
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink1 = 10
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.priority.cappucino_s3_aggregate_profile_sink2 = 5
> agent.sinkgroups.cappucino_s3_aggregate_profile_sinkgroup.processor.maxpenalty = 30000
> ## Confused_s3_trip_summary Failover SinkGroup
> ##########################################
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.sinks =
> cappucino_s3_trip_summary_sink1 cappucino_s3_trip_summary_sink2
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.type =
> failover
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink1
> = 10
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.priority.cappucino_s3_trip_summary_sink2
> = 5
> agent.sinkgroups.cappucino_s3_trip_summary_sinkgroup.processor.maxpenalty =
> 30000
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira