You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe Stein <jo...@stealth.ly> on 2014/06/08 12:10:42 UTC

Re: Kafka producer in CSharp

To circle back around this thread there is an available native .net client
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.net now.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Tue, May 13, 2014 at 5:01 AM, Margusja <ma...@roo.ee> wrote:

> Ok got some info myself.
>
> I can have fault tolerance - I can start kafka-http-endpoint using broker
> lists
> I can have ack - start using --sync
>
> But what is best practice in case if kafka-http-endpoint goes down?
>
> Start multiple kafka-http-endpoint's and in client side just control that
> kafka-http-endpoint is up? And if not up then using another?
>
>
> Best regards, Margus (Margusja) Roo
> +372 51 48 780
> http://margus.roo.ee
> http://ee.linkedin.com/in/margusroo
> skype: margusja
> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>
> On 13/05/14 10:49, Margusja wrote:
>
>> Thank you for response. I think HTTP is ok.
>> I have two more question in case of HTTP.
>> 1. Can I have fault tolerance in case I have two or more brokers?
>> 2. Can I ack that message is in queue?
>>
>> Best regards, Margus (Margusja) Roo
>> +372 51 48 780
>> http://margus.roo.ee
>> http://ee.linkedin.com/in/margusroo
>> skype: margusja
>> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>>
>> On 12/05/14 23:28, Joe Stein wrote:
>>
>>> The wire protocol has changed drastically since then.
>>>
>>> I don't know of any C# clients (there are none on the client library page
>>> nor have I heard of any being used in production but maybe there are
>>> some).
>>>
>>>
>>> For clients that use DotNet I often suggest that they use some HTTP
>>> producer/consumer
>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>> Clients#Clients-HTTPREST
>>>
>>> /*******************************************
>>>   Joe Stein
>>>   Founder, Principal Consultant
>>>   Big Data Open Source Security LLC
>>>   http://www.stealth.ly
>>>   Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>> ********************************************/
>>>
>>>
>>> On Mon, May 12, 2014 at 12:34 PM, Margusja <ma...@roo.ee> wrote:
>>>
>>>  Hi
>>>>
>>>> I have kafka broker running (kafka_2.9.1-0.8.1.1)
>>>> All is working.
>>>>
>>>> One project requires producer is written in CSharp
>>>> I am not dot net programmer but I managed to write simple producer code
>>>> using https://github.com/kafka-dev/kafka/blob/master/clients/
>>>> csharp/README.md
>>>>
>>>> the code
>>>> ...
>>>> using System;
>>>> using System.Collections.Generic;
>>>> using System.Text;
>>>> using System.Threading.Tasks;
>>>> using Kafka.Client;
>>>>
>>>> namespace DemoProducer
>>>> {
>>>>      class Program
>>>>      {
>>>>          static void Main(string[] args)
>>>>          {
>>>>              string payload1 = "kafka 1.";
>>>>              byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
>>>>              Message msg1 = new Message(payloadData1);
>>>>
>>>>              string payload2 = "kafka 2.";
>>>>              byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
>>>>              Message msg2 = new Message(payloadData2);
>>>>
>>>>              Producer producer = new Producer("broker", 9092);
>>>>              producer.Send("kafkademo3", 0 ,  msg1 );
>>>>          }
>>>>      }
>>>> }
>>>> ...
>>>>
>>>> In broker side I am getting the error if I executing the code above:
>>>>
>>>> [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because
>>>> of error (kafka.network.Processor)
>>>> java.nio.BufferUnderflowException
>>>>          at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
>>>>          at java.nio.ByteBuffer.get(ByteBuffer.java:694)
>>>>          at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
>>>>          at kafka.api.ProducerRequest$.readFrom(ProducerRequest.
>>>> scala:33)
>>>>          at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.
>>>> scala:36)
>>>>          at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.
>>>> scala:36)
>>>>          at kafka.network.RequestChannel$Request.<init>(RequestChannel.
>>>> scala:53)
>>>>          at kafka.network.Processor.read(SocketServer.scala:353)
>>>>          at kafka.network.Processor.run(SocketServer.scala:245)
>>>>          at java.lang.Thread.run(Thread.java:744)
>>>>
>>>>
>>>>
>>>> [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56
>>>> because
>>>> of error (kafka.network.Processor)
>>>> java.io.IOException: Connection reset by peer
>>>>          at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>          at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>          at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>          at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>>>>          at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:
>>>> 379)
>>>>          at kafka.utils.Utils$.read(Utils.scala:375)
>>>>          at kafka.network.BoundedByteBufferReceive.readFrom(
>>>> BoundedByteBufferReceive.scala:54)
>>>>          at kafka.network.Processor.read(SocketServer.scala:347)
>>>>          at kafka.network.Processor.run(SocketServer.scala:245)
>>>>          at java.lang.Thread.run(Thread.java:744)
>>>>
>>>> I suspected that the problem is in the broker version
>>>> (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.
>>>> Now I was able to send messages using CSharp code.
>>>>
>>>> So is there workaround how I can use latest kafka version and CSharp ?
>>>> Or
>>>> What is the latest kafka version supporting CSharp producer?
>>>>
>>>> --
>>>> Best regards, Margus (Margusja) Roo
>>>> +372 51 48 780
>>>> http://margus.roo.ee
>>>> http://ee.linkedin.com/in/margusroo
>>>> skype: margusja
>>>> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>>>>
>>>>
>>>>
>>
>