You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "zhiqiangzhao (JIRA)" <ji...@apache.org> on 2017/04/01 03:28:41 UTC
[jira] [Commented] (FLUME-3074) format error happened on windows
when kafka sink is used
[ https://issues.apache.org/jira/browse/FLUME-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951969#comment-15951969 ]
zhiqiangzhao commented on FLUME-3074:
-------------------------------------
thanks a lot for your comment.
That means it is not right I use fileHeader as the key?
What I confused in that in fact it runs normally In Linux.
What I want is as follows:
a) I Know I have many logs in different directories,and file name is like timestamp.dat
b) I want to set part of directories to use one number as partition id to load share,that means every log file will be put into configured kafka partition
Can you give some suggestions?
> format error happened on windows when kafka sink is used
> --------------------------------------------------------
>
> Key: FLUME-3074
> URL: https://issues.apache.org/jira/browse/FLUME-3074
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.7.0
> Environment: windows 10
> Reporter: zhiqiangzhao
>
> When I use kafka sink to push logs from windows path to kafka server, I meet the format error as follows
> I think it is because windows path was parsed in linux style。
> 17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to publish events
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.EventDeliveryException: Non integer partition id specified
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
> ... 3 more
> Caused by: java.lang.NumberFormatException: For input string: "C:\Users\smart\Desktop\source\23.txt"
> at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
> ... 3 more
> My config text is
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> tier1.sources = s1
> tier1.channels = c1
> tier1.sinks = sk1
>
> tier1.sources.s1.type = spooldir
> tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
> tier1.sources.s1.fileHeader = true
> tier1.sources.s1.fileHeaderKey = key
> tier1.sources.s1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> tier1.sources.s1.deserializer.maxBlobLength = 2000000000
> tier1.sources.s1.deserializer.maxBackoff=30000
> tier1.sources.s1.channels = c1
> tier1.channels.c1.type = memory
> tier1.channels.c1.capacity = 10004
> tier1.channels.c1.transactionCapacity = 100
> tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sk1.topic = winSink
> tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
> tier1.sinks.sk1.partitionIdHeader = key
> tier1.sinks.sk1.channel = c1
> tier1.sinks.sk1.batchSize = 20
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)