You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Karol Nowak <ro...@gmail.com> on 2017/07/28 09:26:23 UTC

Question about messages

Hello!

Is it possible to send an array of String by Kafka Producer object. I want
to take some messages from 'topic1' - lines of text then split it to single
words and send it to another topic. I tried to use foreach loop over
msg.toString.split("//+") but it didn't help me.

object KafkaConsumer extends App {
>
>       implicit val actorSystem = ActorSystem("test-actor-system")
>       implicit val streamMaterializer = ActorMaterializer()
>       implicit val executionContext = actorSystem.dispatcher
>       val log = actorSystem.log
>
>
>       // PRODUCER config
>       val producerSettings = ProducerSettings(
>         actorSystem,
>         new ByteArraySerializer,
>         new StringSerializer)
>         .withBootstrapServers("localhost:9092")
>         .withProperty("auto.create.topics.enable", "true")
>
>       // CONSUMER config
>       val consumerSettings = ConsumerSettings(
>         system = actorSystem,
>         keyDeserializer = new ByteArrayDeserializer,
>         valueDeserializer = new StringDeserializer)
>         .withBootstrapServers("localhost:9092")
>         .withGroupId("kafka-sample")
>         .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>       // -----------------------------------------------------------------------//
>
>       // ROUTE OF THE APP
>       Consumer.committableSource(consumerSettings,
>       Subscriptions.topics("topic1"))
>      .map {
>            msg => println(s"topic1 -> topic2: $msg")
>            ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
>           }
>      .runWith(Producer.commitableSink(producerSettings))
>      }
>
>