You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by srinivas koniki <sr...@yahoo.com.INVALID> on 2016/11/05 22:28:14 UTC

Reopen KAFKA-4344 ?

Hi,
I'm still seeing the same issue with spring boot. Code is below, sorry code is in groovy and not fully baked. Just have single processor. It worked well with single partition. But when i increased the partitions, started seeing the error as in this kafka-4344.


import com.codahale.metrics.MetricRegistry
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
import org.apache.kafka.streams.processor.AbstractProcessor
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.apache.kafka.streams.processor.TopologyBuilder
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.AfterReturning
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Pointcut
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.actuate.metrics.CounterService
import org.springframework.boot.actuate.metrics.GaugeService
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.Lifecycle
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer
import org.springframework.stereotype.Component
import org.springframework.test.context.ContextConfiguration
import org.springframework.util.StopWatch
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.Future
import java.util.stream.IntStream

/**
 * Created by srinivas.koniki on 11/5/16.
 */
@ContextConfiguration(classes=[TestConfig, MetricsAspect, RiakService])
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class MetricsSpec extends Specification{

    static String kafkaTopic = 'testTopic'

    @Shared
    TestConfig testConfigRef

    @Autowired
    TestConfig testConfig

    @Autowired
    MetricRegistry metricRegistry

    @Autowired
    KafkaProducer kafkaProducer

    @Shared
    static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1)

    def setupSpec() {
        println("Heavy init for all the tests...")
        CLUSTER.start()
        System.setProperty('broker.url',CLUSTER.bootstrapServers())
        System.setProperty('zk.url',CLUSTER.zKConnectString())
        System.setProperty('kafka.topic',kafkaTopic)
        CLUSTER.createTopic(kafkaTopic, 3, 1)
    }

    def cleanupSpec() {
        testConfigRef.stop()
        CLUSTER.stop()
    }

    def "Test send and receive" (){
        expect:
        testConfig != null
        metricRegistry != null
        println ''+metricRegistry.getGauges()

        when:
        testConfigRef = testConfig
        testConfig.start()
        List<Future> futureList = new ArrayList<>()
        IntStream.range(1,4).forEach({ i ->
            Future future = kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, 'test'+i, 'testMesg'+i))
        })

        futureList.forEach({ future ->
           println future.get()
        })
        then:
        Thread.sleep(20000)

        println ''+metricRegistry.getGauges()
        println ''+metricRegistry.counters
        metricRegistry.counters.keySet().forEach({key ->
            println key+':'+metricRegistry.counters.get(key).count
        })
        Thread.sleep(2000)
    }

    @Configuration
    @SpringBootApplication
    static class TestConfig implements Lifecycle {

        @Value('${broker.url}')
        String brokerUrl

        Map<String, String> producerConfig(){
            def props = ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0, "batch.size": 16384, "linger.ms": 1,
                         "buffer.memory" : 33554432,
                         "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                         "value.serializer" : "org.apache.kafka.common.serialization.StringSerializer"
            ]
        }

        @Bean
        KafkaProducer<String, String> kafkaProducer() {
            new KafkaProducer<String, String>(producerConfig())
        }

        @Bean
        public static PropertySourcesPlaceholderConfigurer properties() {
            return new PropertySourcesPlaceholderConfigurer()
        }

        @Value('${zk.url}')
        String zkUrl

        @Value('${kafka.topic}')
        String kafkaTopic

        @Autowired
        RiakService riakService

        @Autowired
        CounterService counterService

        KafkaStreams streams

        boolean state

        @Override
        void start() {
            println 'starting streams'
            Properties props = new Properties();
            props.put('group.id', "streams-test-processor-group");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-processor");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
            props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl);
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("Source", kafkaTopic);
            def processor = new PriceProcessor(riakService, counterService)
            def procSupplier = {processor} as ProcessorSupplier
            builder.addProcessor("Process", procSupplier, "Source");

            streams = new KafkaStreams(builder, props);
            streams.start()
            println ' Streams started'
            state = true
        }

        @Override
        void stop() {
            streams.close()
            state = false
        }

        @Override
        boolean isRunning() {
            return state
        }

    }

    static class PriceProcessor extends AbstractProcessor<String, String> {
        RiakService riakClient1
        CounterService counterService
        PriceProcessor(RiakService riakClient2, CounterService counterService1){
            this.riakClient1 = riakClient2
            this.counterService = counterService1
        }

        @Override
        void process(String key, String value) {
            riakClient1.save(key, value)

            context().commit()
            println 'offset-'+context().offset()+', partition -'+context().partition()
            counterService.increment('kafka.partition.'+context().partition()+'.offset')
        }


    }

    @Aspect
    @Component
    static class MetricsAspect {
        final CounterService counterService
        final GaugeService gaugeService

        @Autowired
        MetricsAspect(GaugeService gaugeService1, CounterService counterService1){
            println 'aspect init'
            this.gaugeService = gaugeService1
            this.counterService = counterService1
        }

        @Pointcut("execution(* com.bsb.retail.price.kafka.RiakService.save(..))")
        public void methodsToBeProfiled(){}

        @Around("methodsToBeProfiled()")
        public Object profile(ProceedingJoinPoint pjp) throws Throwable {
            StopWatch sw = new StopWatch(getClass().getSimpleName());
            try {
                sw.start(pjp.getSignature().getName());
                return pjp.proceed();
            } finally {
                sw.stop();
                System.out.println(sw.prettyPrint());
                gaugeService.submit('riakSave', sw.lastTaskTimeMillis)
            }

        }

        @AfterReturning(pointcut = "execution(* com.bsb.retail.price.RiakService.save(..)) ")
        void metric(){
            println 'increment metrics counter'
            counterService.increment('message.count')
        }



    }


}

@Component
class RiakService {
    void save(String key, String value) {
        println 'Sending to riak '+Thread.currentThread().name
    }
}

Re: Reopen KAFKA-4344 ?

Posted by saiprasad mishra <sa...@gmail.com>.
Hi Srinivas

I raised the issue and the way I got around this was to let kafka streams
run on POJO way rather than some of the dependent instances being spring
managed bean instances.
If you create the instance of riakService and counterService in processor
class instead of passing the spring managed instances to the processor
class constructor your kafka streams initilization should be fine and it
should create the right number of tasks with right number of processors for
all the partitions.

I was fine with POJO based approach as kafka streams has quite a bit of
apis to query the state(of course once it is started correctly) as i am
running stateful processors and i wanted to query the state data all the
time. I was just using spring boot controller for the web container to
proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis.

Alternatively you can try having prototype components for these two
services (if your usecase is fine with this).

Hope this helps.

Regards
Sai

On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> KAFKA-4344 was not a bug. The issues was as wrong initialization order
> of Kafka Streams by the user.
>
> Please double check your initialization order (and maybe read the old
> email thread and JIRA comments -- it might have some relevant
> information for you to fix the issue for you).
>
> If the problem is still there, can you please reduce your code to a
> minimum example that reproduces the problem?
>
> Thanks!
>
> - -Matthias
>
> On 11/5/16 3:28 PM, srinivas koniki wrote:
> >
> > Hi, I'm still seeing the same issue with spring boot. Code is
> > below, sorry code is in groovy and not fully baked. Just have
> > single processor. It worked well with single partition. But when i
> > increased the partitions, started seeing the error as in this
> > kafka-4344.
> >
> >
> > import com.codahale.metrics.MetricRegistry import
> > org.apache.kafka.clients.consumer.ConsumerConfig import
> > org.apache.kafka.clients.producer.KafkaProducer import
> > org.apache.kafka.clients.producer.ProducerRecord import
> > org.apache.kafka.common.serialization.Serdes import
> > org.apache.kafka.streams.KafkaStreams import
> > org.apache.kafka.streams.StreamsConfig import
> > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
> > import org.apache.kafka.streams.processor.AbstractProcessor import
> > org.apache.kafka.streams.processor.ProcessorSupplier import
> > org.apache.kafka.streams.processor.TopologyBuilder import
> > org.aspectj.lang.ProceedingJoinPoint import
> > org.aspectj.lang.annotation.AfterReturning import
> > org.aspectj.lang.annotation.Around import
> > org.aspectj.lang.annotation.Aspect import
> > org.aspectj.lang.annotation.Pointcut import
> > org.springframework.beans.factory.annotation.Autowired import
> > org.springframework.beans.factory.annotation.Value import
> > org.springframework.boot.actuate.metrics.CounterService import
> > org.springframework.boot.actuate.metrics.GaugeService import
> > org.springframework.boot.autoconfigure.SpringBootApplication import
> > org.springframework.boot.test.context.SpringBootTest import
> > org.springframework.context.Lifecycle import
> > org.springframework.context.annotation.Bean import
> > org.springframework.context.annotation.Configuration import
> > org.springframework.context.annotation.Import import
> > org.springframework.context.support.PropertySourcesPlaceholderConfigur
> er
> >
> >
> import org.springframework.stereotype.Component
> > import org.springframework.test.context.ContextConfiguration import
> > org.springframework.util.StopWatch import spock.lang.Shared import
> > spock.lang.Specification
> >
> > import java.util.concurrent.Future import
> > java.util.stream.IntStream
> >
> > /** * Created by srinivas.koniki on 11/5/16. */
> > @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> > RiakService]) @SpringBootTest(webEnvironment =
> > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> > extends Specification{
> >
> > static String kafkaTopic = 'testTopic'
> >
> > @Shared TestConfig testConfigRef
> >
> > @Autowired TestConfig testConfig
> >
> > @Autowired MetricRegistry metricRegistry
> >
> > @Autowired KafkaProducer kafkaProducer
> >
> > @Shared static final EmbeddedKafkaCluster CLUSTER = new
> > EmbeddedKafkaCluster(1)
> >
> > def setupSpec() { println("Heavy init for all the tests...")
> > CLUSTER.start()
> > System.setProperty('broker.url',CLUSTER.bootstrapServers())
> > System.setProperty('zk.url',CLUSTER.zKConnectString())
> > System.setProperty('kafka.topic',kafkaTopic)
> > CLUSTER.createTopic(kafkaTopic, 3, 1) }
> >
> > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> >
> > def "Test send and receive" (){ expect: testConfig != null
> > metricRegistry != null println ''+metricRegistry.getGauges()
> >
> > when: testConfigRef = testConfig testConfig.start() List<Future>
> > futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i ->
> > Future future = kafkaProducer.send(new ProducerRecord<String,
> > String>(kafkaTopic, 'test'+i, 'testMesg'+i)) })
> >
> > futureList.forEach({ future -> println future.get() }) then:
> > Thread.sleep(20000)
> >
> > println ''+metricRegistry.getGauges() println
> > ''+metricRegistry.counters
> > metricRegistry.counters.keySet().forEach({key -> println
> > key+':'+metricRegistry.counters.get(key).count })
> > Thread.sleep(2000) }
> >
> > @Configuration @SpringBootApplication static class TestConfig
> > implements Lifecycle {
> >
> > @Value('${broker.url}') String brokerUrl
> >
> > Map<String, String> producerConfig(){ def props =
> > ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0,
> > "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432,
> > "key.serializer":
> > "org.apache.kafka.common.serialization.StringSerializer",
> > "value.serializer" :
> > "org.apache.kafka.common.serialization.StringSerializer" ] }
> >
> > @Bean KafkaProducer<String, String> kafkaProducer() { new
> > KafkaProducer<String, String>(producerConfig()) }
> >
> > @Bean public static PropertySourcesPlaceholderConfigurer
> > properties() { return new PropertySourcesPlaceholderConfigurer() }
> >
> > @Value('${zk.url}') String zkUrl
> >
> > @Value('${kafka.topic}') String kafkaTopic
> >
> > @Autowired RiakService riakService
> >
> > @Autowired CounterService counterService
> >
> > KafkaStreams streams
> >
> > boolean state
> >
> > @Override void start() { println 'starting streams' Properties
> > props = new Properties(); props.put('group.id',
> > "streams-test-processor-group");
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "streams-test-processor");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
> > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl);
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> > TopologyBuilder builder = new TopologyBuilder();
> > builder.addSource("Source", kafkaTopic); def processor = new
> > PriceProcessor(riakService, counterService) def procSupplier =
> > {processor} as ProcessorSupplier builder.addProcessor("Process",
> > procSupplier, "Source");
> >
> > streams = new KafkaStreams(builder, props); streams.start() println
> > ' Streams started' state = true }
> >
> > @Override void stop() { streams.close() state = false }
> >
> > @Override boolean isRunning() { return state }
> >
> > }
> >
> > static class PriceProcessor extends AbstractProcessor<String,
> > String> { RiakService riakClient1 CounterService counterService
> > PriceProcessor(RiakService riakClient2, CounterService
> > counterService1){ this.riakClient1 = riakClient2
> > this.counterService = counterService1 }
> >
> > @Override void process(String key, String value) {
> > riakClient1.save(key, value)
> >
> > context().commit() println 'offset-'+context().offset()+',
> > partition -'+context().partition()
> > counterService.increment('kafka.partition.'+context().partition()+'.of
> fset')
> >
> >
> }
> >
> >
> > }
> >
> > @Aspect @Component static class MetricsAspect { final
> > CounterService counterService final GaugeService gaugeService
> >
> > @Autowired MetricsAspect(GaugeService gaugeService1, CounterService
> > counterService1){ println 'aspect init' this.gaugeService =
> > gaugeService1 this.counterService = counterService1 }
> >
> > @Pointcut("execution(*
> > com.bsb.retail.price.kafka.RiakService.save(..))") public void
> > methodsToBeProfiled(){}
> >
> > @Around("methodsToBeProfiled()") public Object
> > profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw =
> > new StopWatch(getClass().getSimpleName()); try {
> > sw.start(pjp.getSignature().getName()); return pjp.proceed(); }
> > finally { sw.stop(); System.out.println(sw.prettyPrint());
> > gaugeService.submit('riakSave', sw.lastTaskTimeMillis) }
> >
> > }
> >
> > @AfterReturning(pointcut = "execution(*
> > com.bsb.retail.price.RiakService.save(..)) ") void metric(){
> > println 'increment metrics counter'
> > counterService.increment('message.count') }
> >
> >
> >
> > }
> >
> >
> > }
> >
> > @Component class RiakService { void save(String key, String value)
> > { println 'Sending to riak '+Thread.currentThread().name } }
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYILUqAAoJECnhiMLycopPMDQP/RZZTwXm0YOVgnAqvvObGwzq
> MCrigCz0+RmFMbGStOVQcRvzEMu1ZAXi6EIq32GUGJtC1L6xwaXamH4IZ9+u/hbi
> w8mp9YiQX9RUJEqDYZp0L7P2PfWamMVTz6ALh5xRlBnPIQrsvTaVZmFsn1/B6peM
> 50/XldUaRNb1RKzpDwjP+K1Y2pfsMxcvvG2VXQUNF6pnjpwETGyOyGzFcl1cX4pc
> vg+pkLb7E5WktDw2c18/bImZqji+P/ofuduBLqoAv19/p7gBFRO3UHyjnb3sl/Yp
> sHv06kXy13jPJP+6O7jIJo7+0IKOVReoOsJnIYsITi/odQXFA0b7wT42v0Xx2d1+
> 9YquS5ue9wvN0epngNtlpr+ADzhn0cTa9bDnLUi8RONzgmoZOn39QfeTsEvGbF5l
> kR1/1a9BPgK/O11b8rI13obBZxT/XdtPtDmZCBCBfXnSEc7/88Ag8eLKPd6fFpeU
> 81FXPfPsQklk0UXQck5zH/sm+AZMpAYJIPphRLIP4NNpcBG1XrP+tCWjZuE+lqCZ
> DPJ3f41ahP6cj1i0LFleIzIi77k0QHCk6tJISsxo1g5XvVARNvC0EcygJp0utWet
> pILJ+o9+l/d5mF19gyEsSpzzoLowvTe4h/fnDkHsSjaJVFrK3XZ/WSI/Drmo5Tjo
> +4ur3dG49ntS/uuTwche
> =1JgC
> -----END PGP SIGNATURE-----
>

Re: Reopen KAFKA-4344 ?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

KAFKA-4344 was not a bug. The issues was as wrong initialization order
of Kafka Streams by the user.

Please double check your initialization order (and maybe read the old
email thread and JIRA comments -- it might have some relevant
information for you to fix the issue for you).

If the problem is still there, can you please reduce your code to a
minimum example that reproduces the problem?

Thanks!

- -Matthias

On 11/5/16 3:28 PM, srinivas koniki wrote:
> 
> Hi, I'm still seeing the same issue with spring boot. Code is
> below, sorry code is in groovy and not fully baked. Just have
> single processor. It worked well with single partition. But when i
> increased the partitions, started seeing the error as in this
> kafka-4344.
> 
> 
> import com.codahale.metrics.MetricRegistry import
> org.apache.kafka.clients.consumer.ConsumerConfig import
> org.apache.kafka.clients.producer.KafkaProducer import
> org.apache.kafka.clients.producer.ProducerRecord import
> org.apache.kafka.common.serialization.Serdes import
> org.apache.kafka.streams.KafkaStreams import
> org.apache.kafka.streams.StreamsConfig import
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster 
> import org.apache.kafka.streams.processor.AbstractProcessor import
> org.apache.kafka.streams.processor.ProcessorSupplier import
> org.apache.kafka.streams.processor.TopologyBuilder import
> org.aspectj.lang.ProceedingJoinPoint import
> org.aspectj.lang.annotation.AfterReturning import
> org.aspectj.lang.annotation.Around import
> org.aspectj.lang.annotation.Aspect import
> org.aspectj.lang.annotation.Pointcut import
> org.springframework.beans.factory.annotation.Autowired import
> org.springframework.beans.factory.annotation.Value import
> org.springframework.boot.actuate.metrics.CounterService import
> org.springframework.boot.actuate.metrics.GaugeService import
> org.springframework.boot.autoconfigure.SpringBootApplication import
> org.springframework.boot.test.context.SpringBootTest import
> org.springframework.context.Lifecycle import
> org.springframework.context.annotation.Bean import
> org.springframework.context.annotation.Configuration import
> org.springframework.context.annotation.Import import
> org.springframework.context.support.PropertySourcesPlaceholderConfigur
er
>
> 
import org.springframework.stereotype.Component
> import org.springframework.test.context.ContextConfiguration import
> org.springframework.util.StopWatch import spock.lang.Shared import
> spock.lang.Specification
> 
> import java.util.concurrent.Future import
> java.util.stream.IntStream
> 
> /** * Created by srinivas.koniki on 11/5/16. */ 
> @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> RiakService]) @SpringBootTest(webEnvironment =
> SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> extends Specification{
> 
> static String kafkaTopic = 'testTopic'
> 
> @Shared TestConfig testConfigRef
> 
> @Autowired TestConfig testConfig
> 
> @Autowired MetricRegistry metricRegistry
> 
> @Autowired KafkaProducer kafkaProducer
> 
> @Shared static final EmbeddedKafkaCluster CLUSTER = new
> EmbeddedKafkaCluster(1)
> 
> def setupSpec() { println("Heavy init for all the tests...") 
> CLUSTER.start() 
> System.setProperty('broker.url',CLUSTER.bootstrapServers()) 
> System.setProperty('zk.url',CLUSTER.zKConnectString()) 
> System.setProperty('kafka.topic',kafkaTopic) 
> CLUSTER.createTopic(kafkaTopic, 3, 1) }
> 
> def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> 
> def "Test send and receive" (){ expect: testConfig != null 
> metricRegistry != null println ''+metricRegistry.getGauges()
> 
> when: testConfigRef = testConfig testConfig.start() List<Future>
> futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> 
> Future future = kafkaProducer.send(new ProducerRecord<String,
> String>(kafkaTopic, 'test'+i, 'testMesg'+i)) })
> 
> futureList.forEach({ future -> println future.get() }) then: 
> Thread.sleep(20000)
> 
> println ''+metricRegistry.getGauges() println
> ''+metricRegistry.counters 
> metricRegistry.counters.keySet().forEach({key -> println
> key+':'+metricRegistry.counters.get(key).count }) 
> Thread.sleep(2000) }
> 
> @Configuration @SpringBootApplication static class TestConfig
> implements Lifecycle {
> 
> @Value('${broker.url}') String brokerUrl
> 
> Map<String, String> producerConfig(){ def props =
> ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0,
> "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, 
> "key.serializer":
> "org.apache.kafka.common.serialization.StringSerializer", 
> "value.serializer" :
> "org.apache.kafka.common.serialization.StringSerializer" ] }
> 
> @Bean KafkaProducer<String, String> kafkaProducer() { new
> KafkaProducer<String, String>(producerConfig()) }
> 
> @Bean public static PropertySourcesPlaceholderConfigurer
> properties() { return new PropertySourcesPlaceholderConfigurer() }
> 
> @Value('${zk.url}') String zkUrl
> 
> @Value('${kafka.topic}') String kafkaTopic
> 
> @Autowired RiakService riakService
> 
> @Autowired CounterService counterService
> 
> KafkaStreams streams
> 
> boolean state
> 
> @Override void start() { println 'starting streams' Properties
> props = new Properties(); props.put('group.id',
> "streams-test-processor-group"); 
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streams-test-processor"); 
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); 
> props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkUrl); 
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass()); 
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass()); 
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
> TopologyBuilder builder = new TopologyBuilder(); 
> builder.addSource("Source", kafkaTopic); def processor = new
> PriceProcessor(riakService, counterService) def procSupplier =
> {processor} as ProcessorSupplier builder.addProcessor("Process",
> procSupplier, "Source");
> 
> streams = new KafkaStreams(builder, props); streams.start() println
> ' Streams started' state = true }
> 
> @Override void stop() { streams.close() state = false }
> 
> @Override boolean isRunning() { return state }
> 
> }
> 
> static class PriceProcessor extends AbstractProcessor<String,
> String> { RiakService riakClient1 CounterService counterService 
> PriceProcessor(RiakService riakClient2, CounterService
> counterService1){ this.riakClient1 = riakClient2 
> this.counterService = counterService1 }
> 
> @Override void process(String key, String value) { 
> riakClient1.save(key, value)
> 
> context().commit() println 'offset-'+context().offset()+',
> partition -'+context().partition() 
> counterService.increment('kafka.partition.'+context().partition()+'.of
fset')
>
> 
}
> 
> 
> }
> 
> @Aspect @Component static class MetricsAspect { final
> CounterService counterService final GaugeService gaugeService
> 
> @Autowired MetricsAspect(GaugeService gaugeService1, CounterService
> counterService1){ println 'aspect init' this.gaugeService =
> gaugeService1 this.counterService = counterService1 }
> 
> @Pointcut("execution(*
> com.bsb.retail.price.kafka.RiakService.save(..))") public void
> methodsToBeProfiled(){}
> 
> @Around("methodsToBeProfiled()") public Object
> profile(ProceedingJoinPoint pjp) throws Throwable { StopWatch sw =
> new StopWatch(getClass().getSimpleName()); try { 
> sw.start(pjp.getSignature().getName()); return pjp.proceed(); }
> finally { sw.stop(); System.out.println(sw.prettyPrint()); 
> gaugeService.submit('riakSave', sw.lastTaskTimeMillis) }
> 
> }
> 
> @AfterReturning(pointcut = "execution(*
> com.bsb.retail.price.RiakService.save(..)) ") void metric(){ 
> println 'increment metrics counter' 
> counterService.increment('message.count') }
> 
> 
> 
> }
> 
> 
> }
> 
> @Component class RiakService { void save(String key, String value)
> { println 'Sending to riak '+Thread.currentThread().name } }
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYILUqAAoJECnhiMLycopPMDQP/RZZTwXm0YOVgnAqvvObGwzq
MCrigCz0+RmFMbGStOVQcRvzEMu1ZAXi6EIq32GUGJtC1L6xwaXamH4IZ9+u/hbi
w8mp9YiQX9RUJEqDYZp0L7P2PfWamMVTz6ALh5xRlBnPIQrsvTaVZmFsn1/B6peM
50/XldUaRNb1RKzpDwjP+K1Y2pfsMxcvvG2VXQUNF6pnjpwETGyOyGzFcl1cX4pc
vg+pkLb7E5WktDw2c18/bImZqji+P/ofuduBLqoAv19/p7gBFRO3UHyjnb3sl/Yp
sHv06kXy13jPJP+6O7jIJo7+0IKOVReoOsJnIYsITi/odQXFA0b7wT42v0Xx2d1+
9YquS5ue9wvN0epngNtlpr+ADzhn0cTa9bDnLUi8RONzgmoZOn39QfeTsEvGbF5l
kR1/1a9BPgK/O11b8rI13obBZxT/XdtPtDmZCBCBfXnSEc7/88Ag8eLKPd6fFpeU
81FXPfPsQklk0UXQck5zH/sm+AZMpAYJIPphRLIP4NNpcBG1XrP+tCWjZuE+lqCZ
DPJ3f41ahP6cj1i0LFleIzIi77k0QHCk6tJISsxo1g5XvVARNvC0EcygJp0utWet
pILJ+o9+l/d5mF19gyEsSpzzoLowvTe4h/fnDkHsSjaJVFrK3XZ/WSI/Drmo5Tjo
+4ur3dG49ntS/uuTwche
=1JgC
-----END PGP SIGNATURE-----