You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "sam (JIRA)" <ji...@apache.org> on 2018/10/02 07:33:00 UTC

[jira] [Updated] (KAFKA-7470) Thread safe accumulator across all instances

     [ https://issues.apache.org/jira/browse/KAFKA-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sam updated KAFKA-7470:
-----------------------
    Description: 
Spark has a useful API for accumulating data in a thread safe way [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2] and comes with some out-of-box useful accumulators e.g. for Longs [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator]

I usually use accumulators for wiring in debugging, profiling, monitoring and diagnostics into Spark jobs. I usually fire off a Future before running a Spark job to periodically print the stats (e.g. TPS, histograms, counts, timings, etc)

So far I cannot find anything that is similar for Kafka Streams. Does anything exist? I imagine this is possible at least for each instance of a Kafka app, but to make this work across several instances would require creating an intermediate topic.

 

Of course we want to be able to call this accumulator in a similar way to the Spark Accumulator while preserving guarantees.  Example usage:
{code:java}
val countAccumulator: Accumulator[Long] = ...

Future {
  every(1 minute) {
    logger.info("Processed " + countAccumulator.value + " records")
  }
}

stream.map(x => {
  countAccumulator.add(1)

  x
}){code}
 

 

 

  was:
For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.

The natural example is calling some external API that supports batching for efficiency.

How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.

So far I have:

{{builder.stream[String, String]("my-input-topic") .mapValues(externalApiCall).to("my-output-topic")}}

What I want is:

{{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")}}

In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In Spark Structured Streaming we can do {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.

 

 

https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams


> Thread safe accumulator across all instances
> --------------------------------------------
>
>                 Key: KAFKA-7470
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7470
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: sam
>            Priority: Major
>
> Spark has a useful API for accumulating data in a thread safe way [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2] and comes with some out-of-box useful accumulators e.g. for Longs [https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator]
> I usually use accumulators for wiring in debugging, profiling, monitoring and diagnostics into Spark jobs. I usually fire off a Future before running a Spark job to periodically print the stats (e.g. TPS, histograms, counts, timings, etc)
> So far I cannot find anything that is similar for Kafka Streams. Does anything exist? I imagine this is possible at least for each instance of a Kafka app, but to make this work across several instances would require creating an intermediate topic.
>  
> Of course we want to be able to call this accumulator in a similar way to the Spark Accumulator while preserving guarantees.  Example usage:
> {code:java}
> val countAccumulator: Accumulator[Long] = ...
> Future {
>   every(1 minute) {
>     logger.info("Processed " + countAccumulator.value + " records")
>   }
> }
> stream.map(x => {
>   countAccumulator.add(1)
>   x
> }){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)