You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marco Abitabile <ma...@gmail.com> on 2017/04/16 18:44:50 UTC

Kafka Streams - Join synchronization issue

Hi All!

I need a little hint to understand how join works, in regards of stream
synchronization.

This mail is a bit long, I need to explain the issue I'm facing.

*TL-TR: *
it seems that join synchonization between stream is not respected as
explained here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization

*The need:*
I have historical data residing into some databases, more specifically:
  - time series of user activities
  - time series of user geo positions

*What I do:*
since I have a new algorithm I want to try, the historical data has been
already pruned by kafka retention policy and I have it into a database.
This is what I'm doing:
  1- spin up kafka-connect sink that takes historical gps data (let's say,
one day of data), ordered by event time, and push them into
"HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
possible into kafka topic, respecting the original event time.
  2- spin up kafka-connect sink that takes historical user activities
 (let's say, one day of data, the same day of gps data, of course), ordered
by event time, and push them into "HistoricalUserActivites" topic. This
tasks pushes historical user activities data as fast as possible into kafka
topic, respecting the original event time.
  3- spin up my new stream processor algorithm

As per the nature of the data, I have the quantity of activity data much
higher than geo data, thus the task1 pushes all the needed geo data into
kafka topic within few minutes (around 10 minutes), while activities data,
since has a higher volume, is entirely pushed within 1 hour.
--> the two streams are pushed into kafka regardless of their
synchronization (however being aware of their nature, as explained above)

*What I expect:*
Now, what I would expect is that when I perform the join between the two
stream:

   userActivitiesStream.join(geoDataStrea, timewindow...)

the join takes the incoming user activities data and joins with the geo
data respecting the given time window.
As per the nature of the data, there is always a match (within the given
timeWindow) between user activities data with geo data (in fact, when this
data arrives in real time, there are no issues at all)

So. I expect that the join picks up from the topic the right geo data
(recall that geo data is pushed into the topic within 10 minutes) and joins
it with the user activities data (recall that user activities data is a
stream that takes around 1 hour)

*What I get:*
What happens is that only the first few minutes of user data is actually
processed by the join, after that user data comes is and the joins doesn't
join any data anymore.

It seems that the join doesn't respect the time semantics (configured to be
the default straregy: event data) unless the two streams are synchronized
(actually, this happens the first minutes, when I start the whole
reprocessing tasks).


Can you help me to provide the right clue? Do I have to push the tho
streams in a sychronized fashion (such as simulating real time data flow,
as they came the first time into the system)?

Thanks for your support.

Best
Marco

Re: Kafka Streams - Join synchronization issue

Posted by Damian Guy <da...@gmail.com>.
Hi Marco,

w.r.t your JoinWindows. I think you'll want to increase the until time as
this is roughly how long the record will be kept in the local state store
that is used for joins (based on event time). So if you have data arriving
on different streams at varying rates, event time of one of your streams is
likely advancing faster than the other stream. In your current code if one
of the streams is ahead of the other by more than 5 minutes, then there is
a good chance the data from the faster stream has been dropped from the
local store, so the join won't happen.

It is pretty hard to tell what is going on with the SessionWindows, but
each time a new event is processed both the initializer and aggregator will
run. Additionally the merger will run for any sessions that need merging.
So there should be the same number of calls to the initializer and
aggegator.  Might be worth taking a few thread dumps to try and work out
what it is doing.

Thanks,
Damian


On Tue, 18 Apr 2017 at 14:01 Marco Abitabile <ma...@gmail.com>
wrote:

> Hello Eno,
>
> yes it is 1-second width (I unintentionally wrote 1-minute, in my previous
> mail).
> Just to provide you more info about the data: location data have 1-second
> resolution, while user activity data arrives with varying speed, may happen
> to have 10-100 records as well as 0 records within 1 second.
>
> In the meantime I tried to start my reprocessing stage by syncing the two
> streams (location data and User data).
> Apparently it works and joins are performed continuously.
> What I'm experiencing now is that after the join I have a session window
> aggregation that is creating new sessions at very high rate. Right now the
> stream app has ingested 6k activity data records, however, more than 500k
> session windows have been created. Among this 500k sessions, around 100
> (one-hundred) have been aggregated with MySession::aggregateSessions
> function.
> also the cpu is 100% used.
> To the same stream app, at the beginning of March, Damian Guy found an
> issue (related with caching and session store) that he managed to fix right
> after. Right now i'm using the trunk version of kafka 0.10.2.0.
>
> the complete code is as follow:
>
>
> //properties
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "UserSessionWithLocation");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> MySessionSerde.class.getName());
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> 86400000);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
> 100);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
> 30000);
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "earliest");
>
> //Other Stream: User Location, is a string with the name of the city the//user is (like "San Francisco")
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> userLocationStream = locationStreamBuilder
>     .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream
>     .map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
>
> //This Stream: User Activity
> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
>     .map(MyStreamUtils::enhanceWithScoreDetails)
>     .join(
>         locationKstream,
>         MyStreamUtils::locationActivityJoiner,
>         JoinWindows.of(1000).until(100
>
> 0 * 60 * 5),
>         stringSerde, jsonSerde, stringSerde)
>     .through("usersWithLocation")
>
>     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
>     .groupByKey(stringSerde, jsonSerde)
>     .aggregate(
>         MySession::new,
>         MySession::aggregateSessions,
>         MySession::mergeSessions,
>         SessionWindows
>             .with(WINDOW_INACTIVITY_GAPS_MS) //5 minutes
>             .until(WINDOW_MAINTAIN_DURATION_MS), // 7 minutes
>         "aggregate_store")
>     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
>
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
>
>
> Do you see any issue here?Thanks a lot.
> Marco
>
>
> 2017-04-18 13:14 GMT+02:00 Eno Thereska <en...@gmail.com>:
>
>> Hi Marco,
>>
>> I noticed your window is 1 second width, not 1 minute width. Is that
>> intentional?
>>
>> Thanks
>>
> Eno
>>
>> On 17 Apr 2017, at 19:41, Marco Abitabile <ma...@gmail.com>
>> wrote:
>>
>> hello Eno,
>>
>> thanks for your support. The two streams are both kstreams. The window is of 1 minute-width until 5 minutes. This is the code:
>>
>>
>> //Other Stream: User Location, is a string with the name of the city the//user is (like "San Francisco")
>>
>> KStreamBuilder builder = new KStreamBuilder();
>> KStream<String, String> userLocationStream = locationStreamBuilder
>>     .stream(stringSerde, stringSerde,"userLocationStreamData");
>> KStream<String, String> locationKstream = userLocationStream
>>     .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");
>>
>> //This Stream: User Activity
>> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, "activityStreamData");
>> activity.filter(MyStreamUtils::filterOutFakeUsers)
>>     .map(MyStreamUtils::enhanceWithScoreDetails)
>>     .join(
>>         locationKstream,
>>         MyStreamUtils::locationActivityJoiner,
>>         JoinWindows.of(1000).until(1000 * 60 * 5),
>>         stringSerde, jsonSerde, stringSerde)
>>     .to("usersWithLocation")
>>
>> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
>> stream.start();
>>
>>
>> And MyStreamUtils::locationActivityJoiner does:
>> public static JsonObject locationActivityJoiner(JsonObject activity, String
>> loc) {
>>     JsonObject join = activity.copy();
>>     join.put("city" , loc);
>>     return join;
>> }
>>
>>
>> hum... your question is letting me think... are you telling me that since
>> both are kstreams, they actually need to be re-streamed in sync?
>>
>> Thanks a lot.
>>
>> Marco
>>
>> 2017-04-16 21:45 GMT+02:00 Eno Thereska <en...@gmail.com>:
>>
>> Hi Marco,
>>>
>>> Could you share a bit of your code, or at a minimum provide some info on:
>>> - is userActivitiesStream and geoDataStream a KStream of KTable?
>>> - what is the length of "timewindow"?
>>>
>>> Thanks
>>> Eno
>>>
>>> > On 16 Apr 2017, at 19:44, Marco Abitabile <ma...@gmail.com>
>>> wrote:
>>> >
>>> > Hi All!
>>> >
>>> > I need a little hint to understand how join works, in regards of stream
>>> > synchronization.
>>> >
>>> > This mail is a bit long, I need to explain the issue I'm facing.
>>> >
>>> > *TL-TR: *
>>> > it seems that join synchonization between stream is not respected as
>>> > explained here:
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
>>> >
>>>
>> > *The need:*
>>> > I have historical data residing into some databases, more specifically:
>>> >  - time series of user activities
>>> >  - time series of user geo positions
>>> >
>>> > *What I do:*
>>> > since I have a new algorithm I want to try, the historical data has
>>> been
>>> > already pruned by kafka retention policy and I have it into a database.
>>> > This is what I'm doing:
>>> >  1- spin up kafka-connect sink that takes historical gps data (let's
>>> say,
>>> > one day of data), ordered by event time, and push them into
>>> > "HistoricalGpsData" topic. This tasks pushes historical geo data as
>>> fast as
>>> > possible into kafka topic, respecting the original event time.
>>> >  2- spin up kafka-connect sink that takes historical user activities
>>> > (let's say, one day of data, the same day of gps data, of course),
>>> ordered
>>> > by event time, and push them into "HistoricalUserActivites" topic. This
>>> > tasks pushes historical user activities data as fast as possible into
>>> kafka
>>> > topic, respecting the original event time.
>>> >  3- spin up my new stream processor algorithm
>>> >
>>> > As per the nature of the data, I have the quantity of activity data
>>> much
>>> > higher than geo data, thus the task1 pushes all the needed geo data
>>> into
>>> > kafka topic within few minutes (around 10 minutes), while activities
>>> data,
>>> > since has a higher volume, is entirely pushed within 1 hour.
>>> > --> the two streams are pushed into kafka regardless of their
>>> > synchronization (however being aware of their nature, as explained
>>> above)
>>> >
>>> > *What I expect:*
>>> > Now, what I would expect is that when I perform the join between the
>>> two
>>> > stream:
>>> >
>>> >   userActivitiesStream.join(geoDataStrea, timewindow...)
>>> >
>>> > the join takes the incoming user activities data and joins with the geo
>>> > data respecting the given time window.
>>> > As per the nature of the data, there is always a match (within the
>>> given
>>> > timeWindow) between user activities data with geo data (in fact, when
>>> this
>>> > data arrives in real time, there are no issues at all)
>>> >
>>> > So. I expect that the join picks up from the topic the right geo data
>>> > (recall that geo data is pushed into the topic within 10 minutes) and
>>> joins
>>> > it with the user activities data (recall that user activities data is a
>>> > stream that takes around 1 hour)
>>> >
>>> > *What I get:*
>>> > What happens is that only the first few minutes of user data is
>>> actually
>>> > processed by the join, after that user data comes is and the joins
>>> doesn't
>>> > join any data anymore.
>>> >
>>> > It seems that the join doesn't respect the time semantics (configured
>>> to be
>>> > the default straregy: event data) unless the two streams are
>>> synchronized
>>> > (actually, this happens the first minutes, when I start the whole
>>> > reprocessing tasks).
>>> >
>>> >
>>> > Can you help me to provide the right clue? Do I have to push the tho
>>> > streams in a sychronized fashion (such as simulating real time data
>>> flow,
>>> > as they came the first time into the system)?
>>> >
>>> > Thanks for your support.
>>> >
>>> > Best
>>> > Marco
>>>
>>>

Re: Kafka Streams - Join synchronization issue

Posted by Marco Abitabile <ma...@gmail.com>.
Hello Eno,

yes it is 1-second width (I unintentionally wrote 1-minute, in my previous
mail).
Just to provide you more info about the data: location data have 1-second
resolution, while user activity data arrives with varying speed, may happen
to have 10-100 records as well as 0 records within 1 second.

In the meantime I tried to start my reprocessing stage by syncing the two
streams (location data and User data).
Apparently it works and joins are performed continuously.
What I'm experiencing now is that after the join I have a session window
aggregation that is creating new sessions at very high rate. Right now the
stream app has ingested 6k activity data records, however, more than 500k
session windows have been created. Among this 500k sessions, around 100
(one-hundred) have been aggregated with MySession::aggregateSessions
function.
also the cpu is 100% used.
To the same stream app, at the beginning of March, Damian Guy found an
issue (related with caching and session store) that he managed to fix right
after. Right now i'm using the trunk version of kafka 0.10.2.0.

the complete code is as follow:


//properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "UserSessionWithLocation");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
MySessionSerde.class.getName());
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
86400000);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
100);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
30000);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");

//Other Stream: User Location, is a string with the name of the city
the//user is (like "San Francisco")

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> userLocationStream = locationStreamBuilder
    .stream(stringSerde, stringSerde,"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream
    .map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");

//This Stream: User Activity
KStream<String, JsonObject> activity = builder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
    .map(MyStreamUtils::enhanceWithScoreDetails)
    .join(
        locationKstream,
        MyStreamUtils::locationActivityJoiner,
        JoinWindows.of(1000).until(1000 * 60 * 5),
        stringSerde, jsonSerde, stringSerde)
    .through("usersWithLocation")

    .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
    .groupByKey(stringSerde, jsonSerde)
    .aggregate(
        MySession::new,
        MySession::aggregateSessions,
        MySession::mergeSessions,
        SessionWindows
            .with(WINDOW_INACTIVITY_GAPS_MS) //5 minutes
            .until(WINDOW_MAINTAIN_DURATION_MS), // 7 minutes
        "aggregate_store")
    .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

KafkaStreams stream = new KafkaStreams(builder, propsActivity);
stream.start();


Do you see any issue here?Thanks a lot.
Marco


2017-04-18 13:14 GMT+02:00 Eno Thereska <en...@gmail.com>:

> Hi Marco,
>
> I noticed your window is 1 second width, not 1 minute width. Is that
> intentional?
>
> Thanks
> Eno
>
> On 17 Apr 2017, at 19:41, Marco Abitabile <ma...@gmail.com>
> wrote:
>
> hello Eno,
>
> thanks for your support. The two streams are both kstreams. The window is of 1 minute-width until 5 minutes. This is the code:
>
>
> //Other Stream: User Location, is a string with the name of the city the//user is (like "San Francisco")
>
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> userLocationStream = locationStreamBuilder
>     .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream
>     .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");
>
> //This Stream: User Activity
> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
>     .map(MyStreamUtils::enhanceWithScoreDetails)
>     .join(
>         locationKstream,
>         MyStreamUtils::locationActivityJoiner,
>         JoinWindows.of(1000).until(1000 * 60 * 5),
>         stringSerde, jsonSerde, stringSerde)
>     .to("usersWithLocation")
>
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
>
>
> And MyStreamUtils::locationActivityJoiner does:
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
>
>
> hum... your question is letting me think... are you telling me that since
> both are kstreams, they actually need to be re-streamed in sync?
>
> Thanks a lot.
>
> Marco
>
>
> 2017-04-16 21:45 GMT+02:00 Eno Thereska <en...@gmail.com>:
>
>> Hi Marco,
>>
>> Could you share a bit of your code, or at a minimum provide some info on:
>> - is userActivitiesStream and geoDataStream a KStream of KTable?
>> - what is the length of "timewindow"?
>>
>> Thanks
>> Eno
>>
>> > On 16 Apr 2017, at 19:44, Marco Abitabile <ma...@gmail.com>
>> wrote:
>> >
>> > Hi All!
>> >
>> > I need a little hint to understand how join works, in regards of stream
>> > synchronization.
>> >
>> > This mail is a bit long, I need to explain the issue I'm facing.
>> >
>> > *TL-TR: *
>> > it seems that join synchonization between stream is not respected as
>> > explained here:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+A
>> dd+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
>> >
>> > *The need:*
>> > I have historical data residing into some databases, more specifically:
>> >  - time series of user activities
>> >  - time series of user geo positions
>> >
>> > *What I do:*
>> > since I have a new algorithm I want to try, the historical data has been
>> > already pruned by kafka retention policy and I have it into a database.
>> > This is what I'm doing:
>> >  1- spin up kafka-connect sink that takes historical gps data (let's
>> say,
>> > one day of data), ordered by event time, and push them into
>> > "HistoricalGpsData" topic. This tasks pushes historical geo data as
>> fast as
>> > possible into kafka topic, respecting the original event time.
>> >  2- spin up kafka-connect sink that takes historical user activities
>> > (let's say, one day of data, the same day of gps data, of course),
>> ordered
>> > by event time, and push them into "HistoricalUserActivites" topic. This
>> > tasks pushes historical user activities data as fast as possible into
>> kafka
>> > topic, respecting the original event time.
>> >  3- spin up my new stream processor algorithm
>> >
>> > As per the nature of the data, I have the quantity of activity data much
>> > higher than geo data, thus the task1 pushes all the needed geo data into
>> > kafka topic within few minutes (around 10 minutes), while activities
>> data,
>> > since has a higher volume, is entirely pushed within 1 hour.
>> > --> the two streams are pushed into kafka regardless of their
>> > synchronization (however being aware of their nature, as explained
>> above)
>> >
>> > *What I expect:*
>> > Now, what I would expect is that when I perform the join between the two
>> > stream:
>> >
>> >   userActivitiesStream.join(geoDataStrea, timewindow...)
>> >
>> > the join takes the incoming user activities data and joins with the geo
>> > data respecting the given time window.
>> > As per the nature of the data, there is always a match (within the given
>> > timeWindow) between user activities data with geo data (in fact, when
>> this
>> > data arrives in real time, there are no issues at all)
>> >
>> > So. I expect that the join picks up from the topic the right geo data
>> > (recall that geo data is pushed into the topic within 10 minutes) and
>> joins
>> > it with the user activities data (recall that user activities data is a
>> > stream that takes around 1 hour)
>> >
>> > *What I get:*
>> > What happens is that only the first few minutes of user data is actually
>> > processed by the join, after that user data comes is and the joins
>> doesn't
>> > join any data anymore.
>> >
>> > It seems that the join doesn't respect the time semantics (configured
>> to be
>> > the default straregy: event data) unless the two streams are
>> synchronized
>> > (actually, this happens the first minutes, when I start the whole
>> > reprocessing tasks).
>> >
>> >
>> > Can you help me to provide the right clue? Do I have to push the tho
>> > streams in a sychronized fashion (such as simulating real time data
>> flow,
>> > as they came the first time into the system)?
>> >
>> > Thanks for your support.
>> >
>> > Best
>> > Marco
>>
>>
>
>

Re: Kafka Streams - Join synchronization issue

Posted by Eno Thereska <en...@gmail.com>.
Hi Marco,

I noticed your window is 1 second width, not 1 minute width. Is that intentional?

Thanks
Eno
> On 17 Apr 2017, at 19:41, Marco Abitabile <ma...@gmail.com> wrote:
> 
> hello Eno,
> thanks for your support. The two streams are both kstreams. The window is of 1 minute-width until 5 minutes. This is the code:
> 
> //Other Stream: User Location, is a string with the name of the city the
> //user is (like "San Francisco")
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> userLocationStream = locationStreamBuilder
>     .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream
>     .map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> //This Stream: User Activity
> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
>     .map(MyStreamUtils::enhanceWithScoreDetails)
>     .join(
>         locationKstream,
>         MyStreamUtils::locationActivityJoiner,
>         JoinWindows.of(1000).until(1000 * 60 * 5),
>         stringSerde, jsonSerde, stringSerde)
>     .to("usersWithLocation")
> 
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
> 
> 
> And MyStreamUtils::locationActivityJoiner does:
> 
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
> 
> hum... your question is letting me think... are you telling me that since both are kstreams, they actually need to be re-streamed in sync?
> 
> Thanks a lot.
> 
> Marco
> 
> 
> 2017-04-16 21:45 GMT+02:00 Eno Thereska <eno.thereska@gmail.com <ma...@gmail.com>>:
> Hi Marco,
> 
> Could you share a bit of your code, or at a minimum provide some info on:
> - is userActivitiesStream and geoDataStream a KStream of KTable?
> - what is the length of "timewindow"?
> 
> Thanks
> Eno
> 
> > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitabile@gmail.com <ma...@gmail.com>> wrote:
> >
> > Hi All!
> >
> > I need a little hint to understand how join works, in regards of stream
> > synchronization.
> >
> > This mail is a bit long, I need to explain the issue I'm facing.
> >
> > *TL-TR: *
> > it seems that join synchonization between stream is not respected as
> > explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization <https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization>
> >
> > *The need:*
> > I have historical data residing into some databases, more specifically:
> >  - time series of user activities
> >  - time series of user geo positions
> >
> > *What I do:*
> > since I have a new algorithm I want to try, the historical data has been
> > already pruned by kafka retention policy and I have it into a database.
> > This is what I'm doing:
> >  1- spin up kafka-connect sink that takes historical gps data (let's say,
> > one day of data), ordered by event time, and push them into
> > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
> > possible into kafka topic, respecting the original event time.
> >  2- spin up kafka-connect sink that takes historical user activities
> > (let's say, one day of data, the same day of gps data, of course), ordered
> > by event time, and push them into "HistoricalUserActivites" topic. This
> > tasks pushes historical user activities data as fast as possible into kafka
> > topic, respecting the original event time.
> >  3- spin up my new stream processor algorithm
> >
> > As per the nature of the data, I have the quantity of activity data much
> > higher than geo data, thus the task1 pushes all the needed geo data into
> > kafka topic within few minutes (around 10 minutes), while activities data,
> > since has a higher volume, is entirely pushed within 1 hour.
> > --> the two streams are pushed into kafka regardless of their
> > synchronization (however being aware of their nature, as explained above)
> >
> > *What I expect:*
> > Now, what I would expect is that when I perform the join between the two
> > stream:
> >
> >   userActivitiesStream.join(geoDataStrea, timewindow...)
> >
> > the join takes the incoming user activities data and joins with the geo
> > data respecting the given time window.
> > As per the nature of the data, there is always a match (within the given
> > timeWindow) between user activities data with geo data (in fact, when this
> > data arrives in real time, there are no issues at all)
> >
> > So. I expect that the join picks up from the topic the right geo data
> > (recall that geo data is pushed into the topic within 10 minutes) and joins
> > it with the user activities data (recall that user activities data is a
> > stream that takes around 1 hour)
> >
> > *What I get:*
> > What happens is that only the first few minutes of user data is actually
> > processed by the join, after that user data comes is and the joins doesn't
> > join any data anymore.
> >
> > It seems that the join doesn't respect the time semantics (configured to be
> > the default straregy: event data) unless the two streams are synchronized
> > (actually, this happens the first minutes, when I start the whole
> > reprocessing tasks).
> >
> >
> > Can you help me to provide the right clue? Do I have to push the tho
> > streams in a sychronized fashion (such as simulating real time data flow,
> > as they came the first time into the system)?
> >
> > Thanks for your support.
> >
> > Best
> > Marco
> 
> 


Re: Kafka Streams - Join synchronization issue

Posted by Marco Abitabile <ma...@gmail.com>.
hello Eno,

thanks for your support. The two streams are both kstreams. The window
is of 1 minute-width until 5 minutes. This is the code:


//Other Stream: User Location, is a string with the name of the city
the//user is (like "San Francisco")

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> userLocationStream = locationStreamBuilder
    .stream(stringSerde, stringSerde,"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream
    .map(MyStreamUtils::enhanceWithAreaDetails);locationKstream.to("user_location");

//This Stream: User Activity
KStream<String, JsonObject> activity = builder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
    .map(MyStreamUtils::enhanceWithScoreDetails)
    .join(
        locationKstream,
        MyStreamUtils::locationActivityJoiner,
        JoinWindows.of(1000).until(1000 * 60 * 5),
        stringSerde, jsonSerde, stringSerde)
    .to("usersWithLocation")

KafkaStreams stream = new KafkaStreams(builder, propsActivity);
stream.start();


And MyStreamUtils::locationActivityJoiner does:
public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
    JsonObject join = activity.copy();
    join.put("city" , loc);
    return join;
}


hum... your question is letting me think... are you telling me that since
both are kstreams, they actually need to be re-streamed in sync?

Thanks a lot.

Marco


2017-04-16 21:45 GMT+02:00 Eno Thereska <en...@gmail.com>:

> Hi Marco,
>
> Could you share a bit of your code, or at a minimum provide some info on:
> - is userActivitiesStream and geoDataStream a KStream of KTable?
> - what is the length of "timewindow"?
>
> Thanks
> Eno
>
> > On 16 Apr 2017, at 19:44, Marco Abitabile <ma...@gmail.com>
> wrote:
> >
> > Hi All!
> >
> > I need a little hint to understand how join works, in regards of stream
> > synchronization.
> >
> > This mail is a bit long, I need to explain the issue I'm facing.
> >
> > *TL-TR: *
> > it seems that join synchonization between stream is not respected as
> > explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+
> Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> >
> > *The need:*
> > I have historical data residing into some databases, more specifically:
> >  - time series of user activities
> >  - time series of user geo positions
> >
> > *What I do:*
> > since I have a new algorithm I want to try, the historical data has been
> > already pruned by kafka retention policy and I have it into a database.
> > This is what I'm doing:
> >  1- spin up kafka-connect sink that takes historical gps data (let's say,
> > one day of data), ordered by event time, and push them into
> > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast
> as
> > possible into kafka topic, respecting the original event time.
> >  2- spin up kafka-connect sink that takes historical user activities
> > (let's say, one day of data, the same day of gps data, of course),
> ordered
> > by event time, and push them into "HistoricalUserActivites" topic. This
> > tasks pushes historical user activities data as fast as possible into
> kafka
> > topic, respecting the original event time.
> >  3- spin up my new stream processor algorithm
> >
> > As per the nature of the data, I have the quantity of activity data much
> > higher than geo data, thus the task1 pushes all the needed geo data into
> > kafka topic within few minutes (around 10 minutes), while activities
> data,
> > since has a higher volume, is entirely pushed within 1 hour.
> > --> the two streams are pushed into kafka regardless of their
> > synchronization (however being aware of their nature, as explained above)
> >
> > *What I expect:*
> > Now, what I would expect is that when I perform the join between the two
> > stream:
> >
> >   userActivitiesStream.join(geoDataStrea, timewindow...)
> >
> > the join takes the incoming user activities data and joins with the geo
> > data respecting the given time window.
> > As per the nature of the data, there is always a match (within the given
> > timeWindow) between user activities data with geo data (in fact, when
> this
> > data arrives in real time, there are no issues at all)
> >
> > So. I expect that the join picks up from the topic the right geo data
> > (recall that geo data is pushed into the topic within 10 minutes) and
> joins
> > it with the user activities data (recall that user activities data is a
> > stream that takes around 1 hour)
> >
> > *What I get:*
> > What happens is that only the first few minutes of user data is actually
> > processed by the join, after that user data comes is and the joins
> doesn't
> > join any data anymore.
> >
> > It seems that the join doesn't respect the time semantics (configured to
> be
> > the default straregy: event data) unless the two streams are synchronized
> > (actually, this happens the first minutes, when I start the whole
> > reprocessing tasks).
> >
> >
> > Can you help me to provide the right clue? Do I have to push the tho
> > streams in a sychronized fashion (such as simulating real time data flow,
> > as they came the first time into the system)?
> >
> > Thanks for your support.
> >
> > Best
> > Marco
>
>

Re: Kafka Streams - Join synchronization issue

Posted by Eno Thereska <en...@gmail.com>.
Hi Marco,

Could you share a bit of your code, or at a minimum provide some info on:
- is userActivitiesStream and geoDataStream a KStream of KTable?
- what is the length of "timewindow"?

Thanks
Eno

> On 16 Apr 2017, at 19:44, Marco Abitabile <ma...@gmail.com> wrote:
> 
> Hi All!
> 
> I need a little hint to understand how join works, in regards of stream
> synchronization.
> 
> This mail is a bit long, I need to explain the issue I'm facing.
> 
> *TL-TR: *
> it seems that join synchonization between stream is not respected as
> explained here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> 
> *The need:*
> I have historical data residing into some databases, more specifically:
>  - time series of user activities
>  - time series of user geo positions
> 
> *What I do:*
> since I have a new algorithm I want to try, the historical data has been
> already pruned by kafka retention policy and I have it into a database.
> This is what I'm doing:
>  1- spin up kafka-connect sink that takes historical gps data (let's say,
> one day of data), ordered by event time, and push them into
> "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
> possible into kafka topic, respecting the original event time.
>  2- spin up kafka-connect sink that takes historical user activities
> (let's say, one day of data, the same day of gps data, of course), ordered
> by event time, and push them into "HistoricalUserActivites" topic. This
> tasks pushes historical user activities data as fast as possible into kafka
> topic, respecting the original event time.
>  3- spin up my new stream processor algorithm
> 
> As per the nature of the data, I have the quantity of activity data much
> higher than geo data, thus the task1 pushes all the needed geo data into
> kafka topic within few minutes (around 10 minutes), while activities data,
> since has a higher volume, is entirely pushed within 1 hour.
> --> the two streams are pushed into kafka regardless of their
> synchronization (however being aware of their nature, as explained above)
> 
> *What I expect:*
> Now, what I would expect is that when I perform the join between the two
> stream:
> 
>   userActivitiesStream.join(geoDataStrea, timewindow...)
> 
> the join takes the incoming user activities data and joins with the geo
> data respecting the given time window.
> As per the nature of the data, there is always a match (within the given
> timeWindow) between user activities data with geo data (in fact, when this
> data arrives in real time, there are no issues at all)
> 
> So. I expect that the join picks up from the topic the right geo data
> (recall that geo data is pushed into the topic within 10 minutes) and joins
> it with the user activities data (recall that user activities data is a
> stream that takes around 1 hour)
> 
> *What I get:*
> What happens is that only the first few minutes of user data is actually
> processed by the join, after that user data comes is and the joins doesn't
> join any data anymore.
> 
> It seems that the join doesn't respect the time semantics (configured to be
> the default straregy: event data) unless the two streams are synchronized
> (actually, this happens the first minutes, when I start the whole
> reprocessing tasks).
> 
> 
> Can you help me to provide the right clue? Do I have to push the tho
> streams in a sychronized fashion (such as simulating real time data flow,
> as they came the first time into the system)?
> 
> Thanks for your support.
> 
> Best
> Marco