You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "zhiqiangzhao (JIRA)" <ji...@apache.org> on 2017/04/01 03:28:41 UTC

[jira] [Commented] (FLUME-3074) format error happened on windows when kafka sink is used

    [ https://issues.apache.org/jira/browse/FLUME-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951969#comment-15951969 ] 

zhiqiangzhao commented on FLUME-3074:
-------------------------------------

thanks a lot for your comment.
That means it is not right I use fileHeader as the key?
What I confused in that  in fact  it runs normally In Linux.

What I want is as follows:
a) I Know I have many  logs in different directories,and file name  is like timestamp.dat 
b) I want to set part of directories to use one number as partition id to load share,that means every log file will be put into configured kafka partition

Can you give some suggestions?


> format error happened on windows when kafka sink is used
> --------------------------------------------------------
>
>                 Key: FLUME-3074
>                 URL: https://issues.apache.org/jira/browse/FLUME-3074
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.7.0
>         Environment: windows 10
>            Reporter: zhiqiangzhao
>
> When I use kafka sink to push logs from windows path to kafka server, I meet the  format error as follows
> I think  it is because windows path was  parsed in linux style。
> 17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to publish events
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.EventDeliveryException: Non integer partition id specified
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
> 	... 3 more
> Caused by: java.lang.NumberFormatException: For input string: "C:\Users\smart\Desktop\source\23.txt"
> 	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> 	at java.lang.Integer.parseInt(Integer.java:580)
> 	at java.lang.Integer.parseInt(Integer.java:615)
> 	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
> 	... 3 more
> My config text is
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> tier1.sources = s1
> tier1.channels = c1  
> tier1.sinks = sk1  
>   
> tier1.sources.s1.type = spooldir  
> tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
> tier1.sources.s1.fileHeader = true  
> tier1.sources.s1.fileHeaderKey = key
> tier1.sources.s1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> tier1.sources.s1.deserializer.maxBlobLength = 2000000000
> tier1.sources.s1.deserializer.maxBackoff=30000
> tier1.sources.s1.channels = c1  
> tier1.channels.c1.type = memory  
> tier1.channels.c1.capacity = 10004  
> tier1.channels.c1.transactionCapacity = 100
> tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sk1.topic = winSink
> tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
> tier1.sinks.sk1.partitionIdHeader = key
> tier1.sinks.sk1.channel = c1
> tier1.sinks.sk1.batchSize = 20



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)