You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Puneet Lakhina <pu...@gmail.com> on 2017/11/21 22:14:39 UTC
Streams and Windows
Hello,
Im new to Kafka ecosystem so I apologize if this is all a naive question.
What Im looking to accomplish is the following:
- We get heartbeat events from a source which is piped into a kafka topic.
These events are of the form - {"session_id": "foo", "total_time_spent": x
} . We get these events ever 15 seconds or so. The total time spent is the
total time for a certain session id, so for a session id its cumulative and
thus monotonically increasing.
- Now we need to transform this stream to emit incremental total time
spent. So essentially for any event the incremental_time_spent is the
current total_time_spent - previous_total_time_spent . The notion of
previous is time based.
Im wondering how to achieve the above using the streams api.
The first attempt I made was of the following form:
KStreamBuilder builder = new KStreamBuilder();
builder
.stream(topic)
.groupByKey()
.aggregate(() -> null, Incrementalize::incrementalize)
.toStream()
.print();
with the incrementalize method being the following:
private static JsonNode incrementalize(String sessionId, JsonNode
currentNode, JsonNode currentAggregate) {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();
long currentTimeSpent = currentNode.get("total_time_spent").asLong();
if (currentAggregate == null) {
result.put("incremental_time_spent", currentTimeSpent);
} else {
ArrayNode values = (ArrayNode) currentAggregate.get("values");
valuesArray.addAll(values);
result.put("incremental_time_spent",
Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
1).asLong()));
}
valuesArray.add(currentTimeSpent);
result.set("values", valuesArray);
return result;
}
Now the problem Im having with the above approach is that the KTable ->
toStream doesnt emit every change, it only seems to emit a change every 30
seconds even though the heartbeats are coming in every 15 seconds and are
getting accumulated into the values array appropriately.
Is there a gap in my fundamental modeling of the problem into streams?
Could someone please point me in the right direction?
Thanks!
Puneet
--
Regards,
Puneet
Re: Streams and Windows
Posted by "Matthias J. Sax" <ma...@confluent.io>.
You need to disable KTable cache to get every update by setting caches
size to zero:
https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
-Matthias
On 11/21/17 2:14 PM, Puneet Lakhina wrote:
> Hello,
>
> Im new to Kafka ecosystem so I apologize if this is all a naive question.
>
> What Im looking to accomplish is the following:
>
> - We get heartbeat events from a source which is piped into a kafka topic.
> These events are of the form - {"session_id": "foo", "total_time_spent": x
> } . We get these events ever 15 seconds or so. The total time spent is the
> total time for a certain session id, so for a session id its cumulative and
> thus monotonically increasing.
> - Now we need to transform this stream to emit incremental total time
> spent. So essentially for any event the incremental_time_spent is the
> current total_time_spent - previous_total_time_spent . The notion of
> previous is time based.
>
> Im wondering how to achieve the above using the streams api.
>
> The first attempt I made was of the following form:
>
> KStreamBuilder builder = new KStreamBuilder();
>
> builder
>
> .stream(topic)
>
> .groupByKey()
>
> .aggregate(() -> null, Incrementalize::incrementalize)
>
> .toStream()
>
> .print();
>
>
> with the incrementalize method being the following:
>
>
> private static JsonNode incrementalize(String sessionId, JsonNode
> currentNode, JsonNode currentAggregate) {
>
> ObjectNode result = JsonNodeFactory.instance.objectNode();
>
> ArrayNode valuesArray = JsonNodeFactory.instance.arrayNode();
>
> long currentTimeSpent = currentNode.get("total_time_spent").asLong();
>
> if (currentAggregate == null) {
>
> result.put("incremental_time_spent", currentTimeSpent);
>
> } else {
>
> ArrayNode values = (ArrayNode) currentAggregate.get("values");
>
> valuesArray.addAll(values);
>
> result.put("incremental_time_spent",
>
> Math.max(0, currentTimeSpent - valuesArray.get(valuesArray.size() -
> 1).asLong()));
>
> }
>
> valuesArray.add(currentTimeSpent);
>
> result.set("values", valuesArray);
>
> return result;
>
> }
>
>
> Now the problem Im having with the above approach is that the KTable ->
> toStream doesnt emit every change, it only seems to emit a change every 30
> seconds even though the heartbeats are coming in every 15 seconds and are
> getting accumulated into the values array appropriately.
>
> Is there a gap in my fundamental modeling of the problem into streams?
> Could someone please point me in the right direction?
>
>
> Thanks!
>
> Puneet
>