You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by ZORAIDA HIDALGO SANCHEZ <zo...@tid.es> on 2013/05/22 12:20:50 UTC

Missing headers when using AVRO Sink/Source

Dear all,

I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
Secondly, I changed the configuration so having two machines, the conf in each of them was:

FIRST MACHINE:
SPOOLING source(with my custom interceptor)
FILE channel
AVRO sink

SECOND MACHINE:
AVRO source
FILE channel
HDFS sink

Now, the error that I am getting is(from second machine):

ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:375)
at java.lang.Long.valueOf(Long.java:525)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
... 5 more

It looks like the header is missing.

FIRST MACHINE CONF:
tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /home/user/flume/data
tier1.sources.source1.batchSize = 1000
tier1.sources.source1.bufferMaxLines = 3000
tier1.sources.source1.fileHeader = true
tier1.sources.source1.fileSuffix=.COMPLETED
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = it1
tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
tier1.sources.source1.interceptors.it1.preserveExisting=true
tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = avro
tier1.sinks.sink1.hostname = 10.95.108.245
tier1.sinks.sink1.port = 4141
tier1.sinks.sink1.channel = channel1

SECOND MACHINE CONF:
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 4141
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.batchSize = 1000
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollTimeout = 10
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.writeFormat = Text

________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra pol?tica de env?o y recepci?n de correo electr?nico en el enlace situado m?s abajo.
This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx

Re: Missing headers when using AVRO Sink/Source

Posted by Mike Percy <mp...@apache.org>.
FYI there is a stock timestamp interceptor, if you want to use that.

Mike

On May 22, 2013, at 3:20 AM, ZORAIDA HIDALGO SANCHEZ <zo...@tid.es> wrote:

> Dear all,
> 
> I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
> Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
> Secondly, I changed the configuration so having two machines, the conf in each of them was:
> 
> FIRST MACHINE:
> SPOOLING source(with my custom interceptor)
> FILE channel
> AVRO sink
> 
> SECOND MACHINE:
> AVRO source
> FILE channel
> HDFS sink
> 
> Now, the error that I am getting is(from second machine):
> 
> ERROR hdfs.HDFSEventSink: process failed
> java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
> at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
> at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:375)
> at java.lang.Long.valueOf(Long.java:525)
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
> ... 5 more
> 
> It looks like the header is missing. 
> 
> FIRST MACHINE CONF:
> tier1.sources.source1.type     = spooldir
> tier1.sources.source1.spoolDir = /home/user/flume/data
> tier1.sources.source1.batchSize = 1000
> tier1.sources.source1.bufferMaxLines = 3000
> tier1.sources.source1.fileHeader = true
> tier1.sources.source1.fileSuffix=.COMPLETED
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.interceptors = it1
> tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
> tier1.sources.source1.interceptors.it1.preserveExisting=true
> tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
> tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd
> 
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
> 
> tier1.sinks.sink1.type = avro
> tier1.sinks.sink1.hostname = 10.95.108.245
> tier1.sinks.sink1.port = 4141
> tier1.sinks.sink1.channel = channel1
> 
> SECOND MACHINE CONF:
> tier1.sources.source1.type = avro
> tier1.sources.source1.bind = 0.0.0.0
> tier1.sources.source1.port = 4141
> tier1.sources.source1.channels = channel1
> 
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
> 
> tier1.sinks.sink1.type = hdfs
> tier1.sinks.sink1.channel = channel1
> tier1.sinks.sink1.hdfs.batchSize = 1000
> tier1.sinks.sink1.hdfs.rollInterval = 5
> tier1.sinks.sink1.hdfs.rollTimeout = 10
> tier1.sinks.sink1.hdfs.rollCount = 0
> tier1.sinks.sink1.hdfs.rollSize = 0
> tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
> tier1.sinks.sink1.hdfs.fileType = DataStream
> tier1.sinks.sink1.writeFormat = Text
> 
> 
> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo.
> This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at:
> http://www.tid.es/ES/PAGINAS/disclaimer.aspx

Re: Missing headers when using AVRO Sink/Source

Posted by ZORAIDA HIDALGO SANCHEZ <zo...@tid.es>.
Dear all,
Forget my email. There were some pending events into the channel with no headers and that is why I got the error. I remove all pending events and now it is working fine.
Thanks.

De: zoraida <zo...@tid.es>>
Fecha: miércoles, 22 de mayo de 2013 12:20
Para: Flume User List <us...@flume.apache.org>>
Asunto: Missing headers when using AVRO Sink/Source

Dear all,

I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
Secondly, I changed the configuration so having two machines, the conf in each of them was:

FIRST MACHINE:
SPOOLING source(with my custom interceptor)
FILE channel
AVRO sink

SECOND MACHINE:
AVRO source
FILE channel
HDFS sink

Now, the error that I am getting is(from second machine):

ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:375)
at java.lang.Long.valueOf(Long.java:525)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
... 5 more

It looks like the header is missing.

FIRST MACHINE CONF:
tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /home/user/flume/data
tier1.sources.source1.batchSize = 1000
tier1.sources.source1.bufferMaxLines = 3000
tier1.sources.source1.fileHeader = true
tier1.sources.source1.fileSuffix=.COMPLETED
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = it1
tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
tier1.sources.source1.interceptors.it1.preserveExisting=true
tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = avro
tier1.sinks.sink1.hostname = 10.95.108.245
tier1.sinks.sink1.port = 4141
tier1.sinks.sink1.channel = channel1

SECOND MACHINE CONF:
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 4141
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.batchSize = 1000
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollTimeout = 10
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.writeFormat = Text

________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo.
This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx