You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by S Ahmed <sa...@gmail.com> on 2014/06/13 22:51:22 UTC

re-writing old fetch request to work with 0.8 version

I found this embedded kafka example online (
https://gist.github.com/mardambey/2650743)  which I am re-writing to work
with 0.8

Can someone help me re-write this portion:


  val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
  var offset = 0L

  var i = 0

  while (true) {
    val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)

    for (msg <- cons.fetch(fetchRequest)) {
      i = i + 1
      println("consumed [ " + i + "]: offset = " + msg.offset + ",
payload = " + Utils.toString(msg.message.payload, "UTF-8"))
      offset = msg.offset
    }
  }


I have this so far:

  val partition = 0
  var offset = 0L

  var i = 0
  while (true) {
    //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
    val fetchRequest = new FetchRequestBuilder().addFetch(topic,
partition, offset, 1024).build()

    val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)

    val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
    println("consumed Message " + messageSet(0).message)

  }

This currently loops forever b/c it isn't incrementing the offset or anything.
I'm confused b/c I believe there is no more offset as things are more
user friendly with an incrmeenting counter.

Any help would be appreciated.

Re: re-writing old fetch request to work with 0.8 version

Posted by S Ahmed <sa...@gmail.com>.
Ok so now it is looping through the messages fine, and outputting the
actual message payload:

  while (true) {
    //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
    val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition,
offset, 1024).build()

    val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)

    val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
    println("consumed Message " +
Utils.readString(messageSet(0).message.payload, "UTF-8") )
    offset += 1

  }


Is there a way for it to not crash at the end?


*** Just to be clear, the idea is the run an embedded version in my web
application so I can verify the messages are being send and processed in
development, this isn't a production idea of mine :)

consumed Message test199

consumed Message test200

[error] (run-main-0) java.lang.IndexOutOfBoundsException: 0

java.lang.IndexOutOfBoundsException: 0

at
scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)

at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)

at
com.debugging.jobs.KafkaEmbedded$delayedInit$body.apply(KafkaEmbedded.scala:92)

at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)

at scala.App$class.main(App.scala:71)

at com.debugging.jobs.KafkaEmbedded$.main(KafkaEmbedded.scala:24)

at com.debugging.jobs.KafkaEmbedded.main(KafkaEmbedded.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)




On Fri, Jun 13, 2014 at 4:51 PM, S Ahmed <sa...@gmail.com> wrote:

> I found this embedded kafka example online (
> https://gist.github.com/mardambey/2650743)  which I am re-writing to work
> with 0.8
>
> Can someone help me re-write this portion:
>
>
>
>   val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
>
>   var offset = 0L
>
>
>
>   var i = 0
>
>
>   while (true) {
>     val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
>
>
>
>     for (msg <- cons.fetch(fetchRequest)) {
>
>       i = i + 1
>
>       println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
>
>       offset = msg.offset
>
>     }
>   }
>
>
>
> I have this so far:
>
>   val partition = 0
>   var offset = 0L
>
>   var i = 0
>   while (true) {
>     //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
>     val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024).build()
>
>     val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)
>
>     val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
>     println("consumed Message " + messageSet(0).message)
>
>   }
>
> This currently loops forever b/c it isn't incrementing the offset or anything.
>
> I'm confused b/c I believe there is no more offset as things are more user friendly with an incrmeenting counter.
>
>
> Any help would be appreciated.
>
>