You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by RedShift <re...@telenet.be> on 2017/10/10 13:27:02 UTC

Getting started with stream processing

Hi all

Complete noob with regards to stream processing, this is my first attempt. I'm going to try and explain my thought process, here's what I'm trying to do:

I would like to create a sum of "load" for every hour, for every device.

Incoming stream of data:

{"deviceId":"1234","data":{"tss":1507619473,"load":9}}
{"deviceId":"1234","data":{"tss":1507619511,"load":8}}
{"deviceId":"1234","data":{"tss":1507619549,"load":5}}
{"deviceId":"9876","data":{"tss":1507619587,"load":8}}
{"deviceId":"1234","data":{"tss":1507619625,"load":8}}
{"deviceId":"1234","data":{"tss":1507619678,"load":8}}
{"deviceId":"9876","data":{"tss":1507619716,"load":8}}
{"deviceId":"9876","data":{"tss":1507619752,"load":9}}
{"deviceId":"1234","data":{"tss":1507619789,"load":8}}
{"deviceId":"9876","data":{"tss":1507619825,"load":8}}
{"deviceId":"9876","data":{"tss":1507619864,"load":8}}

Where
deviceId: unique ID for every device, which also doubles as the key I use
tss: UNIX timestamp in seconds
load: load indication

Expected outcome something like this:
deviceId: 1234, time: 2017-10-01 18:00, load: 25
deviceId: 1234, time: 2017-10-01 19:00, load: 13
deviceId: 9876, time: 2017-10-01 18:00, load: 33
deviceId: 9876, time: 2017-10-01 19:00, load: 5
...


So I started:

   SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH"); // Important bit here, I use this to construct a grouping key
   KStreamBuilder builder = new KStreamBuilder();
   KStream<String, JsonObject> data = builder.stream("telemetry");

We need to group by device, so:

   KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) -> v.get("deviceId").asString());

But now I can't group the data again by date. So I made a combined grouping key like this:

   KGroupedStream<String, JsonObject> grouped = data.groupBy(
       (k, v) ->
       {
           Date dt = Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
           return v.get("deviceId").asString() + dateFormat.format(dt);
       }
   );

Now I need to reduce the groups to sum the load:

   grouped.reduce(new Reducer<JsonObject>()
   {
       @Override
       public JsonObject apply(JsonObject v1, JsonObject v2)
       {
           return null;
       }
   });

But that's a problem. I'm supposed to sum "load" here, but I also have to return a JsonObject. That doesn't seem right. So now I figure I have to extract the "load" before the reducer, but a KGroupedStream doesn't have a map() function.

Back to the drawing board. So I figure let's extract the "load" and grouping key first:

   KStream<Object, Object> map = data.map(new KeyValueMapper<String, JsonObject, KeyValue<?, ?>>()
   {
       @Override
       public KeyValue<String, Integer> apply(String s, JsonObject v)
       {
           Date dt = Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
           String key = v.get("deviceId").asString() + dateFormat.format(dt);

           return new KeyValue<>(
               key, v.get("data").asObject().get("load").asInt()
           );
       }
   });

But now I'm left with a KStream of <Object, Object>. I've lost my types. If I change it to:
Kstream<String, Integer>, the compiler has this to say:

Error:(35, 54) java: incompatible types: inference variable KR has incompatible bounds
     equality constraints: java.lang.String
     lower bounds: java.lang.Object
     
Makes sense, as there's no garantuee that a random given object is a string. But how do I preserve types then?

I'm also unsure about the way I'm grouping things. It seems to me I have to group by deviceId, and then using windowing to get the "per hour" part. But I'm even more clueless how and where that fits in. For some reason I also think a KTable should be the final result?

Thanks,

Best regards,

Re: Getting started with stream processing

Posted by Guozhang Wang <wa...@gmail.com>.
Regarding windowing: actually the window boundaries are aligned at epoch
(i.e. UTC 1970, 00.00.00), so the latest window is not NOW - 1 hour.


Guozhang

On Wed, Oct 11, 2017 at 1:42 AM, RedShift <re...@telenet.be> wrote:

> Matthias
>
>
> Thanks, using grouping key of "deviceId + timestamp" with *aggregation*
> _instead of_ reducing solved it:
>
>
>   KGroupedStream<String, JsonObject> grouped = data.groupBy(
>       (k, v) ->
>       {
>           Date dt = Date.from(Instant.ofEpochSecon
> d(v.get("data").asObject().get("tss").asLong()));
>           return v.get("deviceId").asString() + dateFormat.format(dt);
>       }
>   );
>
>
>   KTable<String, Integer> aggregate = grouped.aggregate(
>       () -> 0,
>       (aggKey, value, aggr) -> aggr + value.get("data").asObject().g
> et("load").asInt(),
>       Serdes.Integer()
>   );
>
> I'm still trying to find out how windowing fits in. It sounds like a
> tumbling window, but a tumbling window is defined by its length. So you get
> information for the last hour that has passed, but that last hour is a
> window of NOW - 1 hour. How do I get a window to align to hours of the
> clock?
>
>
>
>
> On 10/10/2017 19:41, Matthias J. Sax wrote:
>
>> Hi,
>>
>> if the aggregation returns a different type, you can use .aggregate(...)
>> instead of .reduce(...)
>>
>> Also, for you time based computation, did you consider to use windowing?
>>
>>
>> -Matthias
>>
>> On 10/10/17 6:27 AM, RedShift wrote:
>>
>>> Hi all
>>>
>>> Complete noob with regards to stream processing, this is my first
>>> attempt. I'm going to try and explain my thought process, here's what
>>> I'm trying to do:
>>>
>>> I would like to create a sum of "load" for every hour, for every device.
>>>
>>> Incoming stream of data:
>>>
>>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
>>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
>>>
>>> Where
>>> deviceId: unique ID for every device, which also doubles as the key I use
>>> tss: UNIX timestamp in seconds
>>> load: load indication
>>>
>>> Expected outcome something like this:
>>> deviceId: 1234, time: 2017-10-01 18:00, load: 25
>>> deviceId: 1234, time: 2017-10-01 19:00, load: 13
>>> deviceId: 9876, time: 2017-10-01 18:00, load: 33
>>> deviceId: 9876, time: 2017-10-01 19:00, load: 5
>>> ...
>>>
>>>
>>> So I started:
>>>
>>>    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
>>> // Important bit here, I use this to construct a grouping key
>>>    KStreamBuilder builder = new KStreamBuilder();
>>>    KStream<String, JsonObject> data = builder.stream("telemetry");
>>>
>>> We need to group by device, so:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
>>> v.get("deviceId").asString());
>>>
>>> But now I can't group the data again by date. So I made a combined
>>> grouping key like this:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy(
>>>        (k, v) ->
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().
>>> get("tss").asLong()));
>>>
>>>            return v.get("deviceId").asString() + dateFormat.format(dt);
>>>        }
>>>    );
>>>
>>> Now I need to reduce the groups to sum the load:
>>>
>>>    grouped.reduce(new Reducer<JsonObject>()
>>>    {
>>>        @Override
>>>        public JsonObject apply(JsonObject v1, JsonObject v2)
>>>        {
>>>            return null;
>>>        }
>>>    });
>>>
>>> But that's a problem. I'm supposed to sum "load" here, but I also have
>>> to return a JsonObject. That doesn't seem right. So now I figure I have
>>> to extract the "load" before the reducer, but a KGroupedStream doesn't
>>> have a map() function.
>>>
>>> Back to the drawing board. So I figure let's extract the "load" and
>>> grouping key first:
>>>
>>>    KStream<Object, Object> map = data.map(new KeyValueMapper<String,
>>> JsonObject, KeyValue<?, ?>>()
>>>    {
>>>        @Override
>>>        public KeyValue<String, Integer> apply(String s, JsonObject v)
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().
>>> get("tss").asLong()));
>>>
>>>            String key = v.get("deviceId").asString() +
>>> dateFormat.format(dt);
>>>
>>>            return new KeyValue<>(
>>>                key, v.get("data").asObject().get("load").asInt()
>>>            );
>>>        }
>>>    });
>>>
>>> But now I'm left with a KStream of <Object, Object>. I've lost my types.
>>> If I change it to:
>>> Kstream<String, Integer>, the compiler has this to say:
>>>
>>> Error:(35, 54) java: incompatible types: inference variable KR has
>>> incompatible bounds
>>>      equality constraints: java.lang.String
>>>      lower bounds: java.lang.Object
>>>      Makes sense, as there's no garantuee that a random given object is a
>>> string. But how do I preserve types then?
>>>
>>> I'm also unsure about the way I'm grouping things. It seems to me I have
>>> to group by deviceId, and then using windowing to get the "per hour"
>>> part. But I'm even more clueless how and where that fits in. For some
>>> reason I also think a KTable should be the final result?
>>>
>>> Thanks,
>>>
>>> Best regards,
>>>
>>
>>


-- 
-- Guozhang

Re: Getting started with stream processing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Glad it works.

If you want to use windows, what seems more natural and also allows you
to "expire old windows eventually" (with your current approach, you
never delete old window, and thus each window create a new entry in the
internal key-value store, thus, you store grows unbounded over time) it
should work out of the box, because windows automatically aligned:

https://docs.confluent.io/current/streams/developer-guide.html#tumbling-time-windows

> Tumbling time windows are aligned to the epoch, with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....


-Matthias



On 10/11/17 1:42 AM, RedShift wrote:
> Matthias
> 
> 
> Thanks, using grouping key of "deviceId + timestamp" with *aggregation*
> _instead of_ reducing solved it:
> 
> 
>   KGroupedStream<String, JsonObject> grouped = data.groupBy(
>       (k, v) ->
>       {
>           Date dt =
> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
> 
>           return v.get("deviceId").asString() + dateFormat.format(dt);
>       }
>   );
> 
> 
>   KTable<String, Integer> aggregate = grouped.aggregate(
>       () -> 0,
>       (aggKey, value, aggr) -> aggr +
> value.get("data").asObject().get("load").asInt(),
>       Serdes.Integer()
>   );
> 
> I'm still trying to find out how windowing fits in. It sounds like a
> tumbling window, but a tumbling window is defined by its length. So you
> get information for the last hour that has passed, but that last hour is
> a window of NOW - 1 hour. How do I get a window to align to hours of the
> clock?
> 
> 
> 
> On 10/10/2017 19:41, Matthias J. Sax wrote:
>> Hi,
>>
>> if the aggregation returns a different type, you can use .aggregate(...)
>> instead of .reduce(...)
>>
>> Also, for you time based computation, did you consider to use windowing?
>>
>>
>> -Matthias
>>
>> On 10/10/17 6:27 AM, RedShift wrote:
>>> Hi all
>>>
>>> Complete noob with regards to stream processing, this is my first
>>> attempt. I'm going to try and explain my thought process, here's what
>>> I'm trying to do:
>>>
>>> I would like to create a sum of "load" for every hour, for every device.
>>>
>>> Incoming stream of data:
>>>
>>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
>>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
>>>
>>> Where
>>> deviceId: unique ID for every device, which also doubles as the key I
>>> use
>>> tss: UNIX timestamp in seconds
>>> load: load indication
>>>
>>> Expected outcome something like this:
>>> deviceId: 1234, time: 2017-10-01 18:00, load: 25
>>> deviceId: 1234, time: 2017-10-01 19:00, load: 13
>>> deviceId: 9876, time: 2017-10-01 18:00, load: 33
>>> deviceId: 9876, time: 2017-10-01 19:00, load: 5
>>> ...
>>>
>>>
>>> So I started:
>>>
>>>    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
>>> // Important bit here, I use this to construct a grouping key
>>>    KStreamBuilder builder = new KStreamBuilder();
>>>    KStream<String, JsonObject> data = builder.stream("telemetry");
>>>
>>> We need to group by device, so:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
>>> v.get("deviceId").asString());
>>>
>>> But now I can't group the data again by date. So I made a combined
>>> grouping key like this:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy(
>>>        (k, v) ->
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>>
>>>
>>>            return v.get("deviceId").asString() + dateFormat.format(dt);
>>>        }
>>>    );
>>>
>>> Now I need to reduce the groups to sum the load:
>>>
>>>    grouped.reduce(new Reducer<JsonObject>()
>>>    {
>>>        @Override
>>>        public JsonObject apply(JsonObject v1, JsonObject v2)
>>>        {
>>>            return null;
>>>        }
>>>    });
>>>
>>> But that's a problem. I'm supposed to sum "load" here, but I also have
>>> to return a JsonObject. That doesn't seem right. So now I figure I have
>>> to extract the "load" before the reducer, but a KGroupedStream doesn't
>>> have a map() function.
>>>
>>> Back to the drawing board. So I figure let's extract the "load" and
>>> grouping key first:
>>>
>>>    KStream<Object, Object> map = data.map(new KeyValueMapper<String,
>>> JsonObject, KeyValue<?, ?>>()
>>>    {
>>>        @Override
>>>        public KeyValue<String, Integer> apply(String s, JsonObject v)
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>>
>>>
>>>            String key = v.get("deviceId").asString() +
>>> dateFormat.format(dt);
>>>
>>>            return new KeyValue<>(
>>>                key, v.get("data").asObject().get("load").asInt()
>>>            );
>>>        }
>>>    });
>>>
>>> But now I'm left with a KStream of <Object, Object>. I've lost my types.
>>> If I change it to:
>>> Kstream<String, Integer>, the compiler has this to say:
>>>
>>> Error:(35, 54) java: incompatible types: inference variable KR has
>>> incompatible bounds
>>>      equality constraints: java.lang.String
>>>      lower bounds: java.lang.Object
>>>      Makes sense, as there's no garantuee that a random given object
>>> is a
>>> string. But how do I preserve types then?
>>>
>>> I'm also unsure about the way I'm grouping things. It seems to me I have
>>> to group by deviceId, and then using windowing to get the "per hour"
>>> part. But I'm even more clueless how and where that fits in. For some
>>> reason I also think a KTable should be the final result?
>>>
>>> Thanks,
>>>
>>> Best regards,
>>


Re: Getting started with stream processing

Posted by RedShift <re...@telenet.be>.
Matthias


Thanks, using grouping key of "deviceId + timestamp" with *aggregation* _instead of_ reducing solved it:


   KGroupedStream<String, JsonObject> grouped = data.groupBy(
       (k, v) ->
       {
           Date dt = Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
           return v.get("deviceId").asString() + dateFormat.format(dt);
       }
   );


   KTable<String, Integer> aggregate = grouped.aggregate(
       () -> 0,
       (aggKey, value, aggr) -> aggr + value.get("data").asObject().get("load").asInt(),
       Serdes.Integer()
   );

I'm still trying to find out how windowing fits in. It sounds like a tumbling window, but a tumbling window is defined by its length. So you get information for the last hour that has passed, but that last hour is a window of NOW - 1 hour. How do I get a window to align to hours of the clock?



On 10/10/2017 19:41, Matthias J. Sax wrote:
> Hi,
> 
> if the aggregation returns a different type, you can use .aggregate(...)
> instead of .reduce(...)
> 
> Also, for you time based computation, did you consider to use windowing?
> 
> 
> -Matthias
> 
> On 10/10/17 6:27 AM, RedShift wrote:
>> Hi all
>>
>> Complete noob with regards to stream processing, this is my first
>> attempt. I'm going to try and explain my thought process, here's what
>> I'm trying to do:
>>
>> I would like to create a sum of "load" for every hour, for every device.
>>
>> Incoming stream of data:
>>
>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
>>
>> Where
>> deviceId: unique ID for every device, which also doubles as the key I use
>> tss: UNIX timestamp in seconds
>> load: load indication
>>
>> Expected outcome something like this:
>> deviceId: 1234, time: 2017-10-01 18:00, load: 25
>> deviceId: 1234, time: 2017-10-01 19:00, load: 13
>> deviceId: 9876, time: 2017-10-01 18:00, load: 33
>> deviceId: 9876, time: 2017-10-01 19:00, load: 5
>> ...
>>
>>
>> So I started:
>>
>>    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
>> // Important bit here, I use this to construct a grouping key
>>    KStreamBuilder builder = new KStreamBuilder();
>>    KStream<String, JsonObject> data = builder.stream("telemetry");
>>
>> We need to group by device, so:
>>
>>    KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
>> v.get("deviceId").asString());
>>
>> But now I can't group the data again by date. So I made a combined
>> grouping key like this:
>>
>>    KGroupedStream<String, JsonObject> grouped = data.groupBy(
>>        (k, v) ->
>>        {
>>            Date dt =
>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>
>>            return v.get("deviceId").asString() + dateFormat.format(dt);
>>        }
>>    );
>>
>> Now I need to reduce the groups to sum the load:
>>
>>    grouped.reduce(new Reducer<JsonObject>()
>>    {
>>        @Override
>>        public JsonObject apply(JsonObject v1, JsonObject v2)
>>        {
>>            return null;
>>        }
>>    });
>>
>> But that's a problem. I'm supposed to sum "load" here, but I also have
>> to return a JsonObject. That doesn't seem right. So now I figure I have
>> to extract the "load" before the reducer, but a KGroupedStream doesn't
>> have a map() function.
>>
>> Back to the drawing board. So I figure let's extract the "load" and
>> grouping key first:
>>
>>    KStream<Object, Object> map = data.map(new KeyValueMapper<String,
>> JsonObject, KeyValue<?, ?>>()
>>    {
>>        @Override
>>        public KeyValue<String, Integer> apply(String s, JsonObject v)
>>        {
>>            Date dt =
>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>
>>            String key = v.get("deviceId").asString() +
>> dateFormat.format(dt);
>>
>>            return new KeyValue<>(
>>                key, v.get("data").asObject().get("load").asInt()
>>            );
>>        }
>>    });
>>
>> But now I'm left with a KStream of <Object, Object>. I've lost my types.
>> If I change it to:
>> Kstream<String, Integer>, the compiler has this to say:
>>
>> Error:(35, 54) java: incompatible types: inference variable KR has
>> incompatible bounds
>>      equality constraints: java.lang.String
>>      lower bounds: java.lang.Object
>>      Makes sense, as there's no garantuee that a random given object is a
>> string. But how do I preserve types then?
>>
>> I'm also unsure about the way I'm grouping things. It seems to me I have
>> to group by deviceId, and then using windowing to get the "per hour"
>> part. But I'm even more clueless how and where that fits in. For some
>> reason I also think a KTable should be the final result?
>>
>> Thanks,
>>
>> Best regards,
> 

Re: Getting started with stream processing

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

if the aggregation returns a different type, you can use .aggregate(...)
instead of .reduce(...)

Also, for you time based computation, did you consider to use windowing?


-Matthias

On 10/10/17 6:27 AM, RedShift wrote:
> Hi all
> 
> Complete noob with regards to stream processing, this is my first
> attempt. I'm going to try and explain my thought process, here's what
> I'm trying to do:
> 
> I would like to create a sum of "load" for every hour, for every device.
> 
> Incoming stream of data:
> 
> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
> 
> Where
> deviceId: unique ID for every device, which also doubles as the key I use
> tss: UNIX timestamp in seconds
> load: load indication
> 
> Expected outcome something like this:
> deviceId: 1234, time: 2017-10-01 18:00, load: 25
> deviceId: 1234, time: 2017-10-01 19:00, load: 13
> deviceId: 9876, time: 2017-10-01 18:00, load: 33
> deviceId: 9876, time: 2017-10-01 19:00, load: 5
> ...
> 
> 
> So I started:
> 
>   SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
> // Important bit here, I use this to construct a grouping key
>   KStreamBuilder builder = new KStreamBuilder();
>   KStream<String, JsonObject> data = builder.stream("telemetry");
> 
> We need to group by device, so:
> 
>   KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
> v.get("deviceId").asString());
> 
> But now I can't group the data again by date. So I made a combined
> grouping key like this:
> 
>   KGroupedStream<String, JsonObject> grouped = data.groupBy(
>       (k, v) ->
>       {
>           Date dt =
> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
> 
>           return v.get("deviceId").asString() + dateFormat.format(dt);
>       }
>   );
> 
> Now I need to reduce the groups to sum the load:
> 
>   grouped.reduce(new Reducer<JsonObject>()
>   {
>       @Override
>       public JsonObject apply(JsonObject v1, JsonObject v2)
>       {
>           return null;
>       }
>   });
> 
> But that's a problem. I'm supposed to sum "load" here, but I also have
> to return a JsonObject. That doesn't seem right. So now I figure I have
> to extract the "load" before the reducer, but a KGroupedStream doesn't
> have a map() function.
> 
> Back to the drawing board. So I figure let's extract the "load" and
> grouping key first:
> 
>   KStream<Object, Object> map = data.map(new KeyValueMapper<String,
> JsonObject, KeyValue<?, ?>>()
>   {
>       @Override
>       public KeyValue<String, Integer> apply(String s, JsonObject v)
>       {
>           Date dt =
> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
> 
>           String key = v.get("deviceId").asString() +
> dateFormat.format(dt);
> 
>           return new KeyValue<>(
>               key, v.get("data").asObject().get("load").asInt()
>           );
>       }
>   });
> 
> But now I'm left with a KStream of <Object, Object>. I've lost my types.
> If I change it to:
> Kstream<String, Integer>, the compiler has this to say:
> 
> Error:(35, 54) java: incompatible types: inference variable KR has
> incompatible bounds
>     equality constraints: java.lang.String
>     lower bounds: java.lang.Object
>     Makes sense, as there's no garantuee that a random given object is a
> string. But how do I preserve types then?
> 
> I'm also unsure about the way I'm grouping things. It seems to me I have
> to group by deviceId, and then using windowing to get the "per hour"
> part. But I'm even more clueless how and where that fits in. For some
> reason I also think a KTable should be the final result?
> 
> Thanks,
> 
> Best regards,