You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by S Ahmed <sa...@gmail.com> on 2014/06/13 22:51:22 UTC
re-writing old fetch request to work with 0.8 version
I found this embedded kafka example online (
https://gist.github.com/mardambey/2650743) which I am re-writing to work
with 0.8
Can someone help me re-write this portion:
val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
var offset = 0L
var i = 0
while (true) {
val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
for (msg <- cons.fetch(fetchRequest)) {
i = i + 1
println("consumed [ " + i + "]: offset = " + msg.offset + ",
payload = " + Utils.toString(msg.message.payload, "UTF-8"))
offset = msg.offset
}
}
I have this so far:
val partition = 0
var offset = 0L
var i = 0
while (true) {
//val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
val fetchRequest = new FetchRequestBuilder().addFetch(topic,
partition, offset, 1024).build()
val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
println("consumed Message " + messageSet(0).message)
}
This currently loops forever b/c it isn't incrementing the offset or anything.
I'm confused b/c I believe there is no more offset as things are more
user friendly with an incrmeenting counter.
Any help would be appreciated.
Re: re-writing old fetch request to work with 0.8 version
Posted by S Ahmed <sa...@gmail.com>.
Ok so now it is looping through the messages fine, and outputting the
actual message payload:
while (true) {
//val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition,
offset, 1024).build()
val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
println("consumed Message " +
Utils.readString(messageSet(0).message.payload, "UTF-8") )
offset += 1
}
Is there a way for it to not crash at the end?
*** Just to be clear, the idea is the run an embedded version in my web
application so I can verify the messages are being send and processed in
development, this isn't a production idea of mine :)
consumed Message test199
consumed Message test200
[error] (run-main-0) java.lang.IndexOutOfBoundsException: 0
java.lang.IndexOutOfBoundsException: 0
at
scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
at
com.debugging.jobs.KafkaEmbedded$delayedInit$body.apply(KafkaEmbedded.scala:92)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.debugging.jobs.KafkaEmbedded$.main(KafkaEmbedded.scala:24)
at com.debugging.jobs.KafkaEmbedded.main(KafkaEmbedded.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
On Fri, Jun 13, 2014 at 4:51 PM, S Ahmed <sa...@gmail.com> wrote:
> I found this embedded kafka example online (
> https://gist.github.com/mardambey/2650743) which I am re-writing to work
> with 0.8
>
> Can someone help me re-write this portion:
>
>
>
> val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
>
> var offset = 0L
>
>
>
> var i = 0
>
>
> while (true) {
> val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
>
>
>
> for (msg <- cons.fetch(fetchRequest)) {
>
> i = i + 1
>
> println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
>
> offset = msg.offset
>
> }
> }
>
>
>
> I have this so far:
>
> val partition = 0
> var offset = 0L
>
> var i = 0
> while (true) {
> //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
> val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024).build()
>
> val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)
>
> val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
> println("consumed Message " + messageSet(0).message)
>
> }
>
> This currently loops forever b/c it isn't incrementing the offset or anything.
>
> I'm confused b/c I believe there is no more offset as things are more user friendly with an incrmeenting counter.
>
>
> Any help would be appreciated.
>
>