You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shekar Tippur <ct...@gmail.com> on 2016/07/03 04:42:29 UTC

Re: Building API to make Kafka reactive

Dean,

Thanks a lot for the link. I am going through the documentation.

- Shekar

On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler <de...@gmail.com> wrote:

> Here's another Reactive API: https://github.com/akka/reactive-kafka
>
> It was developed by Software Mill <https://softwaremill.com/> and it's now
> being integrated with Akka <http://akka.io>.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Lightbend <http://lightbend.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Thanks for the suggestion Lohith. Will try that and provide a feedback.
> >
> > - Shekar
> >
> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
> > Lohith.Samaga@mphasis.com
> > > wrote:
> >
> > > Hi Shekar,
> > >         Alternatively, you could make each stage of your pipeline to
> > write
> > > to a Cassandra (or other DB) and your API will read from it. With
> > Cassandra
> > > TTL, the row will be deleted after TTL is passed. No manual cleanup is
> > > required.
> > >
> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
> > > M. Lohith Samaga
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: Shekar Tippur [mailto:ctippur@gmail.com]
> > > Sent: Wednesday, June 29, 2016 12.10
> > > To: users
> > > Subject: Building API to make Kafka reactive
> > >
> > > I am looking at building a reactive api on top of Kafka.
> > > This API produces event to Kafka topic. I want to add a unique session
> id
> > > into the payload.
> > > The data gets transformed as it goes through different stages of a
> > > pipeline. I want to specify a final topic where I want the api to know
> > that
> > > the processing was successful.
> > > The API should give different status at each part of the pipeline.
> > > At the ingestion, the API responds with "submitted"
> > > During the progression, the API returns "in progress"
> > > After successful completion, the API returns "Success"
> > >
> > > Couple of questions:
> > > 1. Is this feasible?
> > > 2. I was looking at project reactor (https://projectreactor.io) where
> > the
> > > docs talk about event bus. I wanted to see if I can implement a
> consumer
> > > that points to the "end" topic and throws an event into the event bus.
> > > Since I would know the session ID, I can process the request
> accordingly.
> > >
> > > Appreciate your inputs.
> > >
> > > - Shekar
> > > Information transmitted by this e-mail is proprietary to Mphasis, its
> > > associated companies and/ or its customers and is intended
> > > for use only by the individual or entity to which it is addressed, and
> > may
> > > contain information that is privileged, confidential or
> > > exempt from disclosure under applicable law. If you are not the
> intended
> > > recipient or it appears that this mail has been forwarded
> > > to you without proper authority, you are notified that any use or
> > > dissemination of this information in any manner is strictly
> > > prohibited. In such cases, please notify us immediately at
> > > mailmaster@mphasis.com and delete this mail from your records.
> > >
> >
>

Re: Building API to make Kafka reactive

Posted by Shekar Tippur <ct...@gmail.com>.
Is there anyway I can get a small working example to start with?

- Shekar

On Wed, Jul 13, 2016 at 10:39 AM, Shekar Tippur <ct...@gmail.com> wrote:

> Dean,
>
> I am having trouble getting this to work.
>
> import akka.actor.ActorSystem;
> import akka.kafka.scaladsl.Producer;
> import akka.stream.javadsl.Source;
> import akka.kafka.ProducerSettings;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> abstract class KafkaPlProducer {
>     protected static ActorSystem system = ActorSystem.create("example");
>     protected static ProducerSettings<byte[], String> producerSettings =
>             ProducerSettings.create(system, new ByteArraySerializer(), new StringSerializer())
>             .withBootstrapServers("localhost:9092");
> }
>
> class PlumberSink extends KafkaPlProducer {
>     //protected final ActorSystem system = ActorSystem.create("example");
>     public static void main(String args[]) {
>
>         Source.range(1, 10000)
>                 .map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem))
>                 .to(Producer.plainSink(producerSettings));
>     }
>     //Source.range(1, 10000).map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem)).to(Producer.plainSink(producerSettings));
>
> }
>
>
> Here is the exception:
>
> Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Iterable
> 	at com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.<clinit>(PlumberSink.java:25)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:264)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
> Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	... 4 more
>
>
>
> On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
>> Dean,
>>
>> Thanks a lot for the link. I am going through the documentation.
>>
>> - Shekar
>>
>> On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler <de...@gmail.com>
>> wrote:
>>
>>> Here's another Reactive API: https://github.com/akka/reactive-kafka
>>>
>>> It was developed by Software Mill <https://softwaremill.com/> and it's
>>> now
>>> being integrated with Akka <http://akka.io>.
>>>
>>> dean
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Lightbend <http://lightbend.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur <ct...@gmail.com>
>>> wrote:
>>>
>>> > Thanks for the suggestion Lohith. Will try that and provide a feedback.
>>> >
>>> > - Shekar
>>> >
>>> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
>>> > Lohith.Samaga@mphasis.com
>>> > > wrote:
>>> >
>>> > > Hi Shekar,
>>> > >         Alternatively, you could make each stage of your pipeline to
>>> > write
>>> > > to a Cassandra (or other DB) and your API will read from it. With
>>> > Cassandra
>>> > > TTL, the row will be deleted after TTL is passed. No manual cleanup
>>> is
>>> > > required.
>>> > >
>>> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
>>> > > M. Lohith Samaga
>>> > >
>>> > >
>>> > >
>>> > > -----Original Message-----
>>> > > From: Shekar Tippur [mailto:ctippur@gmail.com]
>>> > > Sent: Wednesday, June 29, 2016 12.10
>>> > > To: users
>>> > > Subject: Building API to make Kafka reactive
>>> > >
>>> > > I am looking at building a reactive api on top of Kafka.
>>> > > This API produces event to Kafka topic. I want to add a unique
>>> session id
>>> > > into the payload.
>>> > > The data gets transformed as it goes through different stages of a
>>> > > pipeline. I want to specify a final topic where I want the api to
>>> know
>>> > that
>>> > > the processing was successful.
>>> > > The API should give different status at each part of the pipeline.
>>> > > At the ingestion, the API responds with "submitted"
>>> > > During the progression, the API returns "in progress"
>>> > > After successful completion, the API returns "Success"
>>> > >
>>> > > Couple of questions:
>>> > > 1. Is this feasible?
>>> > > 2. I was looking at project reactor (https://projectreactor.io)
>>> where
>>> > the
>>> > > docs talk about event bus. I wanted to see if I can implement a
>>> consumer
>>> > > that points to the "end" topic and throws an event into the event
>>> bus.
>>> > > Since I would know the session ID, I can process the request
>>> accordingly.
>>> > >
>>> > > Appreciate your inputs.
>>> > >
>>> > > - Shekar
>>> > > Information transmitted by this e-mail is proprietary to Mphasis, its
>>> > > associated companies and/ or its customers and is intended
>>> > > for use only by the individual or entity to which it is addressed,
>>> and
>>> > may
>>> > > contain information that is privileged, confidential or
>>> > > exempt from disclosure under applicable law. If you are not the
>>> intended
>>> > > recipient or it appears that this mail has been forwarded
>>> > > to you without proper authority, you are notified that any use or
>>> > > dissemination of this information in any manner is strictly
>>> > > prohibited. In such cases, please notify us immediately at
>>> > > mailmaster@mphasis.com and delete this mail from your records.
>>> > >
>>> >
>>>
>>
>>
>

Re: Building API to make Kafka reactive

Posted by Dean Wampler <de...@gmail.com>.
You don't have the Scala library on the app class path, which is used to
implement Akka.

Use the same version that's required for the Akka libraries you're using.

http://mvnrepository.com/artifact/org.scala-lang/scala-library

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Lightbend <http://lightbend.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Jul 13, 2016 at 12:39 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Dean,
>
> I am having trouble getting this to work.
>
> import akka.actor.ActorSystem;
> import akka.kafka.scaladsl.Producer;
> import akka.stream.javadsl.Source;
> import akka.kafka.ProducerSettings;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> abstract class KafkaPlProducer {
>     protected static ActorSystem system = ActorSystem.create("example");
>     protected static ProducerSettings<byte[], String> producerSettings =
>             ProducerSettings.create(system, new ByteArraySerializer(),
> new StringSerializer())
>             .withBootstrapServers("localhost:9092");
> }
>
> class PlumberSink extends KafkaPlProducer {
>     //protected final ActorSystem system = ActorSystem.create("example");
>     public static void main(String args[]) {
>
>         Source.range(1, 10000)
>                 .map(n -> n.toString()).map(elem -> new
> ProducerRecord<byte[], String>("topic1", elem))
>                 .to(Producer.plainSink(producerSettings));
>     }
>     //Source.range(1, 10000).map(n -> n.toString()).map(elem -> new
> ProducerRecord<byte[], String>("topic1",
> elem)).to(Producer.plainSink(producerSettings));
>
> }
>
>
> Here is the exception:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/collection/Iterable
>         at
> com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.<clinit>(PlumberSink.java:25)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:264)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
> Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         ... 4 more
>
>
>
> On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Dean,
> >
> > Thanks a lot for the link. I am going through the documentation.
> >
> > - Shekar
> >
> > On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler <de...@gmail.com>
> > wrote:
> >
> >> Here's another Reactive API: https://github.com/akka/reactive-kafka
> >>
> >> It was developed by Software Mill <https://softwaremill.com/> and it's
> >> now
> >> being integrated with Akka <http://akka.io>.
> >>
> >> dean
> >>
> >> Dean Wampler, Ph.D.
> >> Author: Programming Scala, 2nd Edition
> >> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> >> Lightbend <http://lightbend.com>
> >> @deanwampler <http://twitter.com/deanwampler>
> >> http://polyglotprogramming.com
> >>
> >> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur <ct...@gmail.com>
> >> wrote:
> >>
> >> > Thanks for the suggestion Lohith. Will try that and provide a
> feedback.
> >> >
> >> > - Shekar
> >> >
> >> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
> >> > Lohith.Samaga@mphasis.com
> >> > > wrote:
> >> >
> >> > > Hi Shekar,
> >> > >         Alternatively, you could make each stage of your pipeline to
> >> > write
> >> > > to a Cassandra (or other DB) and your API will read from it. With
> >> > Cassandra
> >> > > TTL, the row will be deleted after TTL is passed. No manual cleanup
> is
> >> > > required.
> >> > >
> >> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
> >> > > M. Lohith Samaga
> >> > >
> >> > >
> >> > >
> >> > > -----Original Message-----
> >> > > From: Shekar Tippur [mailto:ctippur@gmail.com]
> >> > > Sent: Wednesday, June 29, 2016 12.10
> >> > > To: users
> >> > > Subject: Building API to make Kafka reactive
> >> > >
> >> > > I am looking at building a reactive api on top of Kafka.
> >> > > This API produces event to Kafka topic. I want to add a unique
> >> session id
> >> > > into the payload.
> >> > > The data gets transformed as it goes through different stages of a
> >> > > pipeline. I want to specify a final topic where I want the api to
> know
> >> > that
> >> > > the processing was successful.
> >> > > The API should give different status at each part of the pipeline.
> >> > > At the ingestion, the API responds with "submitted"
> >> > > During the progression, the API returns "in progress"
> >> > > After successful completion, the API returns "Success"
> >> > >
> >> > > Couple of questions:
> >> > > 1. Is this feasible?
> >> > > 2. I was looking at project reactor (https://projectreactor.io)
> where
> >> > the
> >> > > docs talk about event bus. I wanted to see if I can implement a
> >> consumer
> >> > > that points to the "end" topic and throws an event into the event
> bus.
> >> > > Since I would know the session ID, I can process the request
> >> accordingly.
> >> > >
> >> > > Appreciate your inputs.
> >> > >
> >> > > - Shekar
> >> > > Information transmitted by this e-mail is proprietary to Mphasis,
> its
> >> > > associated companies and/ or its customers and is intended
> >> > > for use only by the individual or entity to which it is addressed,
> and
> >> > may
> >> > > contain information that is privileged, confidential or
> >> > > exempt from disclosure under applicable law. If you are not the
> >> intended
> >> > > recipient or it appears that this mail has been forwarded
> >> > > to you without proper authority, you are notified that any use or
> >> > > dissemination of this information in any manner is strictly
> >> > > prohibited. In such cases, please notify us immediately at
> >> > > mailmaster@mphasis.com and delete this mail from your records.
> >> > >
> >> >
> >>
> >
> >
>

Re: Building API to make Kafka reactive

Posted by Shekar Tippur <ct...@gmail.com>.
Dean,

I am having trouble getting this to work.

import akka.actor.ActorSystem;
import akka.kafka.scaladsl.Producer;
import akka.stream.javadsl.Source;
import akka.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

abstract class KafkaPlProducer {
    protected static ActorSystem system = ActorSystem.create("example");
    protected static ProducerSettings<byte[], String> producerSettings =
            ProducerSettings.create(system, new ByteArraySerializer(),
new StringSerializer())
            .withBootstrapServers("localhost:9092");
}

class PlumberSink extends KafkaPlProducer {
    //protected final ActorSystem system = ActorSystem.create("example");
    public static void main(String args[]) {

        Source.range(1, 10000)
                .map(n -> n.toString()).map(elem -> new
ProducerRecord<byte[], String>("topic1", elem))
                .to(Producer.plainSink(producerSettings));
    }
    //Source.range(1, 10000).map(n -> n.toString()).map(elem -> new
ProducerRecord<byte[], String>("topic1",
elem)).to(Producer.plainSink(producerSettings));

}


Here is the exception:

Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/Iterable
	at com.companyname.plumber.service.rest.api.internals.KafkaPlProducer.<clinit>(PlumberSink.java:25)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:122)
Caused by: java.lang.ClassNotFoundException: scala.collection.Iterable
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 4 more



On Sat, Jul 2, 2016 at 9:42 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Dean,
>
> Thanks a lot for the link. I am going through the documentation.
>
> - Shekar
>
> On Wed, Jun 29, 2016 at 9:50 AM, Dean Wampler <de...@gmail.com>
> wrote:
>
>> Here's another Reactive API: https://github.com/akka/reactive-kafka
>>
>> It was developed by Software Mill <https://softwaremill.com/> and it's
>> now
>> being integrated with Akka <http://akka.io>.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Lightbend <http://lightbend.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Wed, Jun 29, 2016 at 11:03 AM, Shekar Tippur <ct...@gmail.com>
>> wrote:
>>
>> > Thanks for the suggestion Lohith. Will try that and provide a feedback.
>> >
>> > - Shekar
>> >
>> > On Tue, Jun 28, 2016 at 11:45 PM, Lohith Samaga M <
>> > Lohith.Samaga@mphasis.com
>> > > wrote:
>> >
>> > > Hi Shekar,
>> > >         Alternatively, you could make each stage of your pipeline to
>> > write
>> > > to a Cassandra (or other DB) and your API will read from it. With
>> > Cassandra
>> > > TTL, the row will be deleted after TTL is passed. No manual cleanup is
>> > > required.
>> > >
>> > > Best regards / Mit freundlichen Grüßen / Sincères salutations
>> > > M. Lohith Samaga
>> > >
>> > >
>> > >
>> > > -----Original Message-----
>> > > From: Shekar Tippur [mailto:ctippur@gmail.com]
>> > > Sent: Wednesday, June 29, 2016 12.10
>> > > To: users
>> > > Subject: Building API to make Kafka reactive
>> > >
>> > > I am looking at building a reactive api on top of Kafka.
>> > > This API produces event to Kafka topic. I want to add a unique
>> session id
>> > > into the payload.
>> > > The data gets transformed as it goes through different stages of a
>> > > pipeline. I want to specify a final topic where I want the api to know
>> > that
>> > > the processing was successful.
>> > > The API should give different status at each part of the pipeline.
>> > > At the ingestion, the API responds with "submitted"
>> > > During the progression, the API returns "in progress"
>> > > After successful completion, the API returns "Success"
>> > >
>> > > Couple of questions:
>> > > 1. Is this feasible?
>> > > 2. I was looking at project reactor (https://projectreactor.io) where
>> > the
>> > > docs talk about event bus. I wanted to see if I can implement a
>> consumer
>> > > that points to the "end" topic and throws an event into the event bus.
>> > > Since I would know the session ID, I can process the request
>> accordingly.
>> > >
>> > > Appreciate your inputs.
>> > >
>> > > - Shekar
>> > > Information transmitted by this e-mail is proprietary to Mphasis, its
>> > > associated companies and/ or its customers and is intended
>> > > for use only by the individual or entity to which it is addressed, and
>> > may
>> > > contain information that is privileged, confidential or
>> > > exempt from disclosure under applicable law. If you are not the
>> intended
>> > > recipient or it appears that this mail has been forwarded
>> > > to you without proper authority, you are notified that any use or
>> > > dissemination of this information in any manner is strictly
>> > > prohibited. In such cases, please notify us immediately at
>> > > mailmaster@mphasis.com and delete this mail from your records.
>> > >
>> >
>>
>
>