You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "V Paul (JIRA)" <ji...@apache.org> on 2013/11/23 10:20:35 UTC

[jira] [Updated] (FLUME-2249) Interceptors are failing to modify/drop events with log4j appender and avro source.

     [ https://issues.apache.org/jira/browse/FLUME-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

V Paul updated FLUME-2249:
--------------------------

    Summary: Interceptors are failing to modify/drop events with log4j appender and avro source.  (was: Interceptors is failing to modify/drop events with log4j appender and avro source.)

> Interceptors are failing to modify/drop events with log4j appender and avro source.
> -----------------------------------------------------------------------------------
>
>                 Key: FLUME-2249
>                 URL: https://issues.apache.org/jira/browse/FLUME-2249
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.3.0, v1.4.0
>         Environment: Linux Mint 14 Nadia (based on Ubuntu 12.10, Quantal Quetzal)
> apache-flume-1.4.0
> apache-flume-1.3.0
> log4j-1.2.16
> flume-ng-log4jappender-1.4.0-jar-with-dependencies
>            Reporter: V Paul
>
> tested with Regex Filtering Interceptor, Static Interceptor. There is a bug for Regex Extractor Interceptor - FLUME-2041.
> h5. Configuration:
> h6. log4j.properties:
> {noformat}
> # Root logger option
> log4j.rootLogger=ALL, stdout
> #
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.Target=System.out
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
> #
> log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
> log4j.appender.flume.Hostname = localhost
> log4j.appender.flume.Port = 41414
> # configure a class's logger to output to the flume appender
> log4j.logger.generator.LogGenerator = INFO,flume
> log4j.appender.flume.layout=org.apache.log4j.PatternLayout
> log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
> {noformat}
> h6. secondExample.conf:
> {noformat}
> # Flume test file
> # Listens via Avro RPC on port 41414 and dumps data received to the log
> agent.channels = ch-1
> agent.sources = src-1
> agent.sinks = sink-1
> agent.channels.ch-1.type = memory
> agent.channels.ch-1.capacity = 10000000
> agent.channels.ch-1.transactionCapacity = 1000
> agent.sources.src-1.type = avro
> agent.sources.src-1.channels = ch-1
> agent.sources.src-1.bind = localhost
> agent.sources.src-1.port = 41414
> agent.sources.src-1.interceptors = intrcptr
> agent.sources.src-1.interceptors.intrcptr.type = regex_filter
> agent.sources.src-1.interceptors.intrcptr.regex = ERROR [0-4]:
> agent.sinks.sink-1.type = logger
> agent.sinks.sink-1.channel = ch-1
> {noformat}
> start command:
> {noformat}
> bin/flume-ng agent --conf conf --conf-file secondExample.conf --name agent -Dflume.root.logger=ALL,console
> {noformat}
> ----
> h5. Result:
> Logs seems to be OK:
> {noformat}
> SOURCES: {src-1={ parameters:{port=41414, interceptors.intrcptr.regex=ERROR [0-4]:, interceptors=intrcptr, channels=ch-1, type=avro, bind=localhost, interceptors.intrcptr.type=regex_filter} }}
> CHANNELS: {ch-1={ parameters:{transactionCapacity=1000, capacity=10000000, type=memory} }}
> AgentConfiguration created with Configuration stubs for which full validation was performed[agent]
> SINKS: {sink-1=ComponentConfiguration[sink-1]
>   CONFIG: 
>     CHANNEL:ch-1
> }
> 2013-11-22 23:41:48,689 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:ch-1
> 2013-11-22 23:41:48,689 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks sink-1
> 2013-11-22 23:41:48,690 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources src-1
> 2013-11-22 23:41:48,690 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent]
> 2013-11-22 23:41:48,693 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
> 2013-11-22 23:41:48,706 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel ch-1 type memory
> 2013-11-22 23:41:48,717 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel ch-1
> 2013-11-22 23:41:48,725 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source src-1, type avro
> 2013-11-22 23:41:48,779 (conf-file-poller-0) [INFO - org.apache.flume.interceptor.RegexFilteringInterceptor$Builder.build(RegexFilteringInterceptor.java:160)] Creating RegexFilteringInterceptor: regex=ERROR [0-4]:,excludeEvents=false
> 2013-11-22 23:41:48,782 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: sink-1, type: logger
> 2013-11-22 23:41:48,787 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel ch-1 connected to [src-1, sink-1]
> 2013-11-22 23:41:48,794 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{src-1=EventDrivenSourceRunner: { source:Avro source src-1: { bindAddress: localhost, port: 41414 } }} sinkRunners:{sink-1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@12679f2 counterGroup:{ name:null counters:{} } }} channels:{ch-1=org.apache.flume.channel.MemoryChannel{name: ch-1}} }
> 2013-11-22 23:41:48,795 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch-1
> 2013-11-22 23:41:48,881 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: CHANNEL, name: ch-1, registered successfully.
> 2013-11-22 23:41:48,882 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: CHANNEL, name: ch-1 started
> 2013-11-22 23:41:48,882 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink sink-1
> 2013-11-22 23:41:48,884 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source src-1
> 2013-11-22 23:41:48,885 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:192)] Starting Avro source src-1: { bindAddress: localhost, port: 41414 }...
> 2013-11-22 23:41:48,922 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
> 2013-11-22 23:41:49,442 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SOURCE, name: src-1, registered successfully.
> 2013-11-22 23:41:49,448 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SOURCE, name: src-1 started
> 2013-11-22 23:41:49,450 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:217)] Avro source src-1 started.
> {noformat}
> So it is expected that events containing text from "ERROR 0:" to "ERROR 4:" should be passed to the sink, and all other events should be dropped. But _no events are dropped_ :
> {noformat}
> 2013-11-22 23:56:07,044 (pool-3-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] OPEN
> 2013-11-22 23:56:07,048 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] BOUND: /127.0.0.1:41414
> 2013-11-22 23:56:07,048 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 => /127.0.0.1:41414] CONNECTED: /127.0.0.1:40355
> 2013-11-22 23:56:07,640 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "40000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367544"}, "body": {"bytes": "2013-11-22 23:56:07 ERROR LogGenerator:33 - ERROR 4: 3735f28e-afee-48df-b60d-b3dbe9c0956f
> "}}
> 2013-11-22 23:56:07,713 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "40000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367712"}, "body": {"bytes": "2013-11-22 23:56:07 ERROR LogGenerator:33 - ERROR 3: f0e9cba1-095a-428e-bdab-1d7022dd449c
> "}}
> 2013-11-22 23:56:07,719 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "30000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367717"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 - ERROR 7: fde1ffce-6668-4364-a68e-afd015865be9
> "}}
> 2013-11-22 23:56:07,739 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367730"}, "body": {"bytes": "2013-11-22 23:56:07 INFO  LogGenerator:33 - ERROR 2: 3ea3b3ec-c23a-4dc2-b8d1-3643c46ac15e
> "}}
> 2013-11-22 23:56:07,742 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "30000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367741"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 - ERROR 9: 8682e837-60d0-4c81-9c1b-970ac23af700
> "}}
> 2013-11-22 23:56:07,752 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "30000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367751"}, "body": {"bytes": "2013-11-22 23:56:07 WARN  LogGenerator:33 - ERROR 4: a089b140-8f81-45c2-88d7-435f34f8c681
> "}}
> 2013-11-22 23:56:07,754 (pool-4-thread-1) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:300)] Avro source src-1: Received avro event: {"headers": {"flume.client.log4j.log.level": "20000", "flume.client.log4j.logger.name": "generator.LogGenerator", "flume.client.log4j.message.encoding": "UTF8", "flume.client.log4j.timestamp": "1385157367753"}, "body": {"bytes": "2013-11-22 23:56:07 INFO  LogGenerator:33 - ERROR 7: 57c35064-a6b1-4a9e-afc1-166091ffb68e
> "}}
> 2013-11-22 23:56:07,791 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] DISCONNECTED
> 2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] UNBOUND
> 2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x00297e0d, /127.0.0.1:40355 :> /127.0.0.1:41414] CLOSED
> 2013-11-22 23:56:07,792 (pool-4-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:40355 disconnected.
> {noformat}
> ----
> *Please note* that this bug is reproducible if Flume is configured like this:
> bq. Log4jAppender -> Avro source -> Regex interceptor -> Logger Sink
> As it was mentioned in FLUME-2041 it works as expected if Flume is configured with two agents like this:
> bq. Log4jAppender -> Avro source -> Avro sink -> Avro source -> Regex interceptor  -> Logger Sink



--
This message was sent by Atlassian JIRA
(v6.1#6144)