You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ara Ebrahimi <ar...@argyledata.com> on 2016/09/08 20:20:48 UTC

enhancing KStream DSL

Let’s say I have this:


KStream<String, CallRecord>[] branches = allRecords
    .branch(
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRecord.getCallCommType()),
            (imsi, callRecord) -> true
    );
KStream<String, CallRecord> callRecords = branches[0];
KStream<String, CallRecord> dataRecords = branches[1];
KStream<String, CallRecord> callRecordCounter = branches[2];

callRecordCounter
        .map((imsi, callRecord) -> new KeyValue<>("", ""))
        .countByKey(
                UnlimitedWindows.of("counter-window"),
                stringSerde
        )
        .print();

Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data is DATA. Branch 2 is supposed to get triggered regardless of type all the type so that then I can count stuff for a time window. BUT the problem is branch is implemented like this:

private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
    @Override
    public void process(K key, V value) {
        for (int i = 0; i < predicates.length; i++) {
            if (predicates[i].test(key, value)) {
                // use forward with childIndex here and then break the loop
                // so that no record is going to be piped to multiple streams
                context().forward(key, value, i);
                break;
            }
        }
    }
}

Note the break. So the counter branch is never reached. I’d like to change the behavior of branch so that all predicates are checked and no break happens, in say a branchAll() method. What’s the easiest way to this functionality to the DSL? I tried process() but it doesn’t return KStream.

Ara.




________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.

________________________________

Re: enhancing KStream DSL

Posted by Ara Ebrahimi <ar...@argyledata.com>.
Ah works! Thanks! I was under the impression that these are sequentially chained using the DSL. Didn’t realize I can still use allRecords parallel to the branches.

Ara.

> On Sep 9, 2016, at 5:27 AM, Michael Noll <mi...@confluent.io> wrote:
>
> Oh, my bad.
>
> Updating the third predicate in `branch()` may not even be needed.
>
> You could simply do:
>
> KStream<String, CallRecord>[] branches = allRecords
>        .branch(
>            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
>            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType())
>            // Any callRecords that aren't matching any of the two
> predicates above will be dropped.
>        );
>
> This would give you two branched streams instead of three:
>
>    KStream<String, CallRecord> voiceRecords = branches[0];
>    KStream<String, CallRecord> dataRecords = branches[1];
>    // No third branched stream like before.
>
> Then, to count "everything" (VOICE + DATA + everything else), simply reuse
> the original `allRecords` stream.
>
>
>
> On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mi...@confluent.io> wrote:
>
>> Ara,
>>
>> you have shared this code snippet:
>>
>>>   allRecords.branch(
>>>           (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>>           (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>>           (imsi, callRecord) -> true
>>>   );
>>
>> The branch() operation partitions the allRecords KStream into three
>> disjoint streams.
>>
>> I'd suggest the following.
>>
>> First, update the third predicate in your `branch()` step to be "everything
>> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
>> records are removed:
>>
>>
>>    KStream<String, CallRecord>[] branches = allRecords
>>        .branch(
>>            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>            (imsi, callRecord) -> !(callRecord.getCallCommType().
>> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
>> qualsIgnoreCase("DATA"))
>>        );
>>
>> This would give you:
>>
>>    KStream<String, CallRecord> voiceRecords = branches[0];
>>    KStream<String, CallRecord> dataRecords = branches[1];
>>    KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
>> branches[2];
>>
>> Then, to count "everything" (VOICE + DATA + everything else), simply
>> reuse the original `allRecords` stream.
>>
>> -Michael
>>
>>
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com
>>> wrote:
>>
>>> Let’s say I have this:
>>>
>>>
>>> KStream<String, CallRecord>[] branches = allRecords
>>>    .branch(
>>>            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>>> ecord.getCallCommType()),
>>>            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>>> cord.getCallCommType()),
>>>            (imsi, callRecord) -> true
>>>    );
>>> KStream<String, CallRecord> callRecords = branches[0];
>>> KStream<String, CallRecord> dataRecords = branches[1];
>>> KStream<String, CallRecord> callRecordCounter = branches[2];
>>>
>>> callRecordCounter
>>>        .map((imsi, callRecord) -> new KeyValue<>("", ""))
>>>        .countByKey(
>>>                UnlimitedWindows.of("counter-window"),
>>>                stringSerde
>>>        )
>>>        .print();
>>>
>>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>>> all the type so that then I can count stuff for a time window. BUT the
>>> problem is branch is implemented like this:
>>>
>>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>>>    @Override
>>>    public void process(K key, V value) {
>>>        for (int i = 0; i < predicates.length; i++) {
>>>            if (predicates[i].test(key, value)) {
>>>                // use forward with childIndex here and then break the
>>> loop
>>>                // so that no record is going to be piped to multiple
>>> streams
>>>                context().forward(key, value, i);
>>>                break;
>>>            }
>>>        }
>>>    }
>>> }
>>>
>>> Note the break. So the counter branch is never reached. I’d like to
>>> change the behavior of branch so that all predicates are checked and no
>>> break happens, in say a branchAll() method. What’s the easiest way to this
>>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>>
>>> Ara.
>>>
>>>
>>>
>>>
>>> ________________________________
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Thank you in
>>> advance for your cooperation.
>>>
>>> ________________________________
>>>
>>
>>
>>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
>
> ________________________________




________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.

________________________________

Re: enhancing KStream DSL

Posted by Michael Noll <mi...@confluent.io>.
Oh, my bad.

Updating the third predicate in `branch()` may not even be needed.

You could simply do:

KStream<String, CallRecord>[] branches = allRecords
        .branch(
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
ecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
cord.getCallCommType())
            // Any callRecords that aren't matching any of the two
predicates above will be dropped.
        );

This would give you two branched streams instead of three:

    KStream<String, CallRecord> voiceRecords = branches[0];
    KStream<String, CallRecord> dataRecords = branches[1];
    // No third branched stream like before.

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.



On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mi...@confluent.io> wrote:

> Ara,
>
> you have shared this code snippet:
>
> >    allRecords.branch(
> >            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> >            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> >            (imsi, callRecord) -> true
> >    );
>
> The branch() operation partitions the allRecords KStream into three
> disjoint streams.
>
> I'd suggest the following.
>
> First, update the third predicate in your `branch()` step to be "everything
> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
> records are removed:
>
>
>     KStream<String, CallRecord>[] branches = allRecords
>         .branch(
>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
>             (imsi, callRecord) -> !(callRecord.getCallCommType().
> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
> qualsIgnoreCase("DATA"))
>         );
>
> This would give you:
>
>     KStream<String, CallRecord> voiceRecords = branches[0];
>     KStream<String, CallRecord> dataRecords = branches[1];
>     KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
> branches[2];
>
> Then, to count "everything" (VOICE + DATA + everything else), simply
> reuse the original `allRecords` stream.
>
> -Michael
>
>
>
>
>
> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com
> > wrote:
>
>> Let’s say I have this:
>>
>>
>> KStream<String, CallRecord>[] branches = allRecords
>>     .branch(
>>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>             (imsi, callRecord) -> true
>>     );
>> KStream<String, CallRecord> callRecords = branches[0];
>> KStream<String, CallRecord> dataRecords = branches[1];
>> KStream<String, CallRecord> callRecordCounter = branches[2];
>>
>> callRecordCounter
>>         .map((imsi, callRecord) -> new KeyValue<>("", ""))
>>         .countByKey(
>>                 UnlimitedWindows.of("counter-window"),
>>                 stringSerde
>>         )
>>         .print();
>>
>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>> all the type so that then I can count stuff for a time window. BUT the
>> problem is branch is implemented like this:
>>
>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>>     @Override
>>     public void process(K key, V value) {
>>         for (int i = 0; i < predicates.length; i++) {
>>             if (predicates[i].test(key, value)) {
>>                 // use forward with childIndex here and then break the
>> loop
>>                 // so that no record is going to be piped to multiple
>> streams
>>                 context().forward(key, value, i);
>>                 break;
>>             }
>>         }
>>     }
>> }
>>
>> Note the break. So the counter branch is never reached. I’d like to
>> change the behavior of branch so that all predicates are checked and no
>> break happens, in say a branchAll() method. What’s the easiest way to this
>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>
>> Ara.
>>
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> ________________________________
>>
>
>
>

Re: enhancing KStream DSL

Posted by Michael Noll <mi...@confluent.io>.
Ara,

you have shared this code snippet:

>    allRecords.branch(
>            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
>            (imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
>            (imsi, callRecord) -> true
>    );

The branch() operation partitions the allRecords KStream into three
disjoint streams.

I'd suggest the following.

First, update the third predicate in your `branch()` step to be "everything
but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
records are removed:


    KStream<String, CallRecord>[] branches = allRecords
        .branch(
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
            (imsi, callRecord) -> !(callRecord.getCallCommType().
equalsIgnoreCase("VOICE") || callRecord.getCallCommType().
equalsIgnoreCase("DATA"))
        );

This would give you:

    KStream<String, CallRecord> voiceRecords = branches[0];
    KStream<String, CallRecord> dataRecords = branches[1];
    KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
branches[2];

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.

-Michael





On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ar...@argyledata.com>
wrote:

> Let’s say I have this:
>
>
> KStream<String, CallRecord>[] branches = allRecords
>     .branch(
>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
> callRecord.getCallCommType()),
>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(
> callRecord.getCallCommType()),
>             (imsi, callRecord) -> true
>     );
> KStream<String, CallRecord> callRecords = branches[0];
> KStream<String, CallRecord> dataRecords = branches[1];
> KStream<String, CallRecord> callRecordCounter = branches[2];
>
> callRecordCounter
>         .map((imsi, callRecord) -> new KeyValue<>("", ""))
>         .countByKey(
>                 UnlimitedWindows.of("counter-window"),
>                 stringSerde
>         )
>         .print();
>
> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if
> data is DATA. Branch 2 is supposed to get triggered regardless of type all
> the type so that then I can count stuff for a time window. BUT the problem
> is branch is implemented like this:
>
> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>     @Override
>     public void process(K key, V value) {
>         for (int i = 0; i < predicates.length; i++) {
>             if (predicates[i].test(key, value)) {
>                 // use forward with childIndex here and then break the loop
>                 // so that no record is going to be piped to multiple
> streams
>                 context().forward(key, value, i);
>                 break;
>             }
>         }
>     }
> }
>
> Note the break. So the counter branch is never reached. I’d like to change
> the behavior of branch so that all predicates are checked and no break
> happens, in say a branchAll() method. What’s the easiest way to this
> functionality to the DSL? I tried process() but it doesn’t return KStream.
>
> Ara.
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>