You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Puneet Lakhina <pu...@gmail.com> on 2017/11/21 22:14:39 UTC

Streams and Windows

Hello,

Im new to Kafka ecosystem so I apologize if this is all a naive question.

What Im looking to accomplish is the following:

- We get heartbeat events from a source which is piped into a kafka topic.
These events are of the form - {"session_id": "foo", "total_time_spent": x
} . We get these events ever 15 seconds or so. The total time spent is the
total time for a certain session id, so for a session id its cumulative and
thus monotonically increasing.
- Now we need to transform this stream to emit incremental total time
spent. So essentially for any event the incremental_time_spent is the
current total_time_spent - previous_total_time_spent . The notion of
previous is time based.

Im wondering how to achieve the above using the streams api.

The first attempt I made was of the following form:

KStreamBuilder builder = new KStreamBuilder();

builder

.stream(topic)

.groupByKey()

.aggregate(() -> null, Incrementalize::incrementalize)

.toStream()

.print();


with the incrementalize method being the following:


private static JsonNode incrementalize(String sessionId, JsonNode
currentNode, JsonNode currentAggregate) {

ObjectNode result = JsonNodeFactory.instance.objectNode();

ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();

long currentTimeSpent = currentNode.get("total_time_spent").asLong();

if (currentAggregate == null) {

result.put("incremental_time_spent", currentTimeSpent);

} else {

ArrayNode values = (ArrayNode) currentAggregate.get("values");

valuesArray.addAll(values);

result.put("incremental_time_spent",

Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
1).asLong()));

}

valuesArray.add(currentTimeSpent);

result.set("values", valuesArray);

return result;

}


Now the problem Im having with the above approach is that the KTable ->
toStream doesnt emit every change, it only seems to emit a change every 30
seconds even though the heartbeats are coming in every 15 seconds and are
getting accumulated into the values array appropriately.

Is there a gap in my fundamental modeling of the problem into streams?
Could someone please point me in the right direction?


Thanks!

Puneet

-- 
Regards,
Puneet

Re: Streams and Windows

Posted by "Matthias J. Sax" <ma...@confluent.io>.
You need to disable KTable cache to get every update by setting caches
size to zero:

https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html


-Matthias


On 11/21/17 2:14 PM, Puneet Lakhina wrote:
> Hello,
> 
> Im new to Kafka ecosystem so I apologize if this is all a naive question.
> 
> What Im looking to accomplish is the following:
> 
> - We get heartbeat events from a source which is piped into a kafka topic.
> These events are of the form - {"session_id": "foo", "total_time_spent": x
> } . We get these events ever 15 seconds or so. The total time spent is the
> total time for a certain session id, so for a session id its cumulative and
> thus monotonically increasing.
> - Now we need to transform this stream to emit incremental total time
> spent. So essentially for any event the incremental_time_spent is the
> current total_time_spent - previous_total_time_spent . The notion of
> previous is time based.
> 
> Im wondering how to achieve the above using the streams api.
> 
> The first attempt I made was of the following form:
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> builder
> 
> .stream(topic)
> 
> .groupByKey()
> 
> .aggregate(() -> null, Incrementalize::incrementalize)
> 
> .toStream()
> 
> .print();
> 
> 
> with the incrementalize method being the following:
> 
> 
> private static JsonNode incrementalize(String sessionId, JsonNode
> currentNode, JsonNode currentAggregate) {
> 
> ObjectNode result = JsonNodeFactory.instance.objectNode();
> 
> ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();
> 
> long currentTimeSpent = currentNode.get("total_time_spent").asLong();
> 
> if (currentAggregate == null) {
> 
> result.put("incremental_time_spent", currentTimeSpent);
> 
> } else {
> 
> ArrayNode values = (ArrayNode) currentAggregate.get("values");
> 
> valuesArray.addAll(values);
> 
> result.put("incremental_time_spent",
> 
> Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
> 1).asLong()));
> 
> }
> 
> valuesArray.add(currentTimeSpent);
> 
> result.set("values", valuesArray);
> 
> return result;
> 
> }
> 
> 
> Now the problem Im having with the above approach is that the KTable ->
> toStream doesnt emit every change, it only seems to emit a change every 30
> seconds even though the heartbeats are coming in every 15 seconds and are
> getting accumulated into the values array appropriately.
> 
> Is there a gap in my fundamental modeling of the problem into streams?
> Could someone please point me in the right direction?
> 
> 
> Thanks!
> 
> Puneet
>