You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Karol Nowak <ro...@gmail.com> on 2017/07/28 09:26:23 UTC
Question about messages
Hello!
Is it possible to send an array of String by Kafka Producer object. I want
to take some messages from 'topic1' - lines of text then split it to single
words and send it to another topic. I tried to use foreach loop over
msg.toString.split("//+") but it didn't help me.
object KafkaConsumer extends App {
>
> implicit val actorSystem = ActorSystem("test-actor-system")
> implicit val streamMaterializer = ActorMaterializer()
> implicit val executionContext = actorSystem.dispatcher
> val log = actorSystem.log
>
>
> // PRODUCER config
> val producerSettings = ProducerSettings(
> actorSystem,
> new ByteArraySerializer,
> new StringSerializer)
> .withBootstrapServers("localhost:9092")
> .withProperty("auto.create.topics.enable", "true")
>
> // CONSUMER config
> val consumerSettings = ConsumerSettings(
> system = actorSystem,
> keyDeserializer = new ByteArrayDeserializer,
> valueDeserializer = new StringDeserializer)
> .withBootstrapServers("localhost:9092")
> .withGroupId("kafka-sample")
> .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> // -----------------------------------------------------------------------//
>
> // ROUTE OF THE APP
> Consumer.committableSource(consumerSettings,
> Subscriptions.topics("topic1"))
> .map {
> msg => println(s"topic1 -> topic2: $msg")
> ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
> }
> .runWith(Producer.commitableSink(producerSettings))
> }
>
>