You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "prayagupd (JIRA)" <ji...@apache.org> on 2017/01/10 19:15:58 UTC

[jira] [Updated] (FLUME-3040) Flume client can not append events batch, deflate compression is failing

     [ https://issues.apache.org/jira/browse/FLUME-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

prayagupd updated FLUME-3040:
-----------------------------
    Environment: 
jvm 1.8, flume 1.6.

{code}
java -version
java version "1.8.0"
Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20))
IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References 20161013_322271 (JIT enabled, AOT enabled)
J9VM - R28_Java8_SR3_20161013_1635_B322271
JIT  - tr.r14.java.green_20161011_125790
GC   - R28_Java8_SR3_20161013_1635_B322271_CMPRSS
J9CL - 20161013_322271)
JCL - 20161018_01 based on Oracle jdk8u111-b14
{code}

  was:
I'm using, jvm 1.8 and flume 1.6.

{code}
java -version
java version "1.8.0"
Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20))
IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References 20161013_322271 (JIT enabled, AOT enabled)
J9VM - R28_Java8_SR3_20161013_1635_B322271
JIT  - tr.r14.java.green_20161011_125790
GC   - R28_Java8_SR3_20161013_1635_B322271_CMPRSS
J9CL - 20161013_322271)
JCL - 20161018_01 based on Oracle jdk8u111-b14
{code}


> Flume client can not append events batch, deflate compression is failing
> ------------------------------------------------------------------------
>
>                 Key: FLUME-3040
>                 URL: https://issues.apache.org/jira/browse/FLUME-3040
>             Project: Flume
>          Issue Type: Question
>          Components: File Channel
>    Affects Versions: v1.6.0
>         Environment: jvm 1.8, flume 1.6.
> {code}
> java -version
> java version "1.8.0"
> Java(TM) SE Runtime Environment (build pxa6480sr3fp20-20161019_02(SR3 FP20))
> IBM J9 VM (build 2.8, JRE 1.8.0 Linux amd64-64 Compressed References 20161013_322271 (JIT enabled, AOT enabled)
> J9VM - R28_Java8_SR3_20161013_1635_B322271
> JIT  - tr.r14.java.green_20161011_125790
> GC   - R28_Java8_SR3_20161013_1635_B322271_CMPRSS
> J9CL - 20161013_322271)
> JCL - 20161018_01 based on Oracle jdk8u111-b14
> {code}
>            Reporter: prayagupd
>
> I'm getting following Failed to send batch error at NettyAvroRpcClient#appendBatch
> {code}
> org.apache.flume.EventDeliveryException: Failed to send events
> 	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:785)
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch
> 	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320)
> 	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373)
> 	... 3 more
> Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote handler
> 	at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402)
> 	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379)
> 	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308)
> 	... 4 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException
> 	at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
> 	at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394)
> 	... 6 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException
> 	at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127)
> 	at org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637)
> 	at org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860)
> 	at org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724)
> 	at org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168)
> 	at org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601)
> 	at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140)
> 	at org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282)
> 	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66)
> 	at org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35)
> 	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
> 	at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322)
> 	at org.jboss.netty.channel.Channels.write(Channels.java:725)
> 	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
> 	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
> 	at org.jboss.netty.channel.Channels.write(Channels.java:704)
> 	at org.jboss.netty.channel.Channels.write(Channels.java:671)
> 	at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248)
> 	at org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515)
> 	at org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476)
> 	at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
> 	at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
> 	at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84)
> 	at com.sun.proxy.$Proxy29.appendBatch(Unknown Source)
> 	at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353)
> 	at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:277)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 	... 1 more
> {code}
> My flume config is,
> {code}
> client.sources=source_1 source_2 src-flume-log src-flume-metrics 
> client.channels = file-channel-1 file-metrics-1 file-flumelog-1
> client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1
> #source definations
> client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource
> client.sources.source_1.filegroups = f1
> client.sources.source_1.fileHeader = true
> client.sources.source_1.filegroups.f1 = /u01/app/Logs/microservice1/logging12.log
> client.sources.source_1.idleTimeout=3000
> client.sources.source_1.batchSize=100
> client.sources.source_1.channels=file-channel-1
> client.sources.source_1.multiline=false
> client.sources.source_1.bufferSize=1048576
> client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json
> client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields
> client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter
> client.sources.source_1.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService1.*
> client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false
> client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
> client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
> client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
> client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
> client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
> client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1
> client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource
> client.sources.source_2.filegroups = f1
> client.sources.source_2.fileHeader = true
> client.sources.source_2.filegroups.f1 = /u01/app/Logs/microservice2/logging12.log
> client.sources.source_2.idleTimeout=3000
> client.sources.source_2.batchSize=100
> client.sources.source_2.channels=file-channel-1
> client.sources.source_2.multiline=false
> client.sources.source_2.bufferSize=1048576
> client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json
> client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields
> client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter
> client.sources.source_2.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService2.*
> client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false
> client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
> client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
> client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
> client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
> client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
> client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2
> client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource
> client.sources.src-flume-log.filegroups = f1
> client.sources.src-flume-log.fileHeader = true
> client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log
> client.sources.src-flume-log.idleTimeout=3000
> client.sources.src-flume-log.batchSize=100
> client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json
> client.sources.src-flume-log.channels=file-flumelog-1
> client.sources.src-flume-log.interceptors = i1 
> client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
> client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType
> client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume
> client.sources.src-flume-metrics.type = exec
> client.sources.src-flume-metrics.command = while true ; do echo  ; sleep 5; done
> client.sources.src-flume-metrics.shell = /bin/sh -c
> client.sources.src-flume-metrics.channels = file-metrics-1
> client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json
> client.sources.src-flume-metrics.interceptors = i1 i2
> client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
> client.sources.src-flume-metrics.interceptors.i1.keyList = appCategory|appName|logEnv|logType
> client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics
> client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder
> #channel definations
> client.channels.file-channel-1.type=file
> client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1
> client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1
> client.channels.file-channel-1.checkpointOnClose=true
> client.channels.file-metrics-1.type=file
> client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1
> client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1
> client.channels.file-metrics-1.checkpointOnClose=true
> client.channels.file-flumelog-1.type=file
> client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1
> client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1
> client.channels.file-flumelog-1.checkpointOnClose=true
> #sink definations
> client.sinks.avro-sink-1.type=avro
> client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net
> client.sinks.avro-sink-1.port=443
> client.sinks.avro-sink-1.compression-type=deflate
> client.sinks.avro-sink-1.channel=file-channel-1
> client.sinks.avro-sink-1.ssl=false
> client.sinks.avro-sink-1.trust-all-certs=false
> client.sinks.avro-metrics-sink-1.type=avro
> client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net
> client.sinks.avro-metrics-sink-1.port=443
> client.sinks.avro-metrics-sink-1.compression-type=deflate
> client.sinks.avro-metrics-sink-1.channel=file-metrics-1
> client.sinks.avro-metrics-sink-1.ssl=false
> client.sinks.avro-metrics-sink-1.trust-all-certs=false
> client.sinks.avro-flumelog-sink-1.type=avro
> client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net
> client.sinks.avro-flumelog-sink-1.port=443
> client.sinks.avro-flumelog-sink-1.compression-type=deflate
> client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1
> client.sinks.avro-flumelog-sink-1.ssl=false
> client.sinks.avro-flumelog-sink-1.trust-all-certs=false
> {code}
> It actually works for a hours or so, but then the same failure to appendBatch issue, which seems to be an compression issue.
> At this time, I don't see anything in queueset,
> {code}
> ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/
> total 0
> cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 
> 0
> 4??Y?%?Z*
> *
> cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes 
> J?6?K??y?u.#H?
> cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs 
> J?6?K??y?u.#H?
> {code}
> The end of filechannel1 dataDir log-2 and log-3 are all filled up by ??????
> {code}
> ???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)