You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2018/05/02 20:17:01 UTC

Re: KStreams API Usage

Hello Pradeep,

If you just want to close the Streams app based on the first observation of
a specific value, you can consider using a shutdown latch, in which the
`transformValues()` will decrement and the main thread starting the streams
will listen on, and once the latch has been decremented then calling
kafkaStreams.close(..)
from the main thread.

You can see a concrete example in the simple benchmark code:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L652-L668


Guozhang


On Fri, Apr 27, 2018 at 7:15 PM, pradeep s <sr...@gmail.com>
wrote:

> Hi,
>
> I am trying to call kafka stream close based on the presence of a value in
> the output of ValueTransformer.ValueTransformer produces a
> List<TransformerResult>
>
> Is there a way to avoid the foreach on Kstream and try to get the
> first value alone? (like streams api method findFirst)
>
>  private void checkMerchHierarchyEmpty(KStream<byte[],
> List<TransformerResult>> trans) {
>     trans.filter((key, value) -> value.stream().anyMatch(val ->
>
> MERCH_HIERARCHY_CACHE_EMPTY.equals(
>
>   val.getErrorMessage()))).foreach(
>             ((key, value) -> {
>
> metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
> 1));
>                 log.fatal("Shutting down kafka stream since merch
> hierarchy is empty");
>                 kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
> TimeUnit.MILLISECONDS);
>             })
>     );
>
> }
>
>
> Thanks
> Pradeep
>



-- 
-- Guozhang