You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Artur Mrozowski <ar...@gmail.com> on 2017/11/03 09:04:14 UTC

IllegalStateException when combining 3 streams?

Hi,
I am on second week of our PoC with Beam and I am really amazed by the
capabilities of the framework and how well engineered it is.

Amazed does not mean experienced so please bear with me.

What  we try to achieve is to join several streams using windowing and
triggers. And that is where I fear we hit the limitations  for what can be
done.

In case A we run in global windows and we are able to combine two unbounded
PCollections but when I try to combine the results with third collection I
get the exception below. I tried many diffrent trigger combinations, but
can't make it work.

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds))

In case B I use fixed windows. Again, I can successfully  join two
collections and print output in the console. When I add the third it runs
without errors, but I am not able to materialize results in the console.
Although I am able to print results of merge using Flatten so the error
above is not longer an issue.

Has anyone experience with joining three or more unbounded PCollections?
What would be successful windowing, triggering strategy for global or fixed
window respectively?

Below code snippets from fixed windows case. Windows are defined in the
same manner for all three collections, customer, claim and policy. The Join
class I use comes from https://github.com/apache/beam/blob/master/sdks/
java/extensions/join-library/src/main/java/org/apache/beam/
sdk/extensions/joinlibrary/Join.java


Would be really greateful if any of you would like to share your knowledge.

Best Regard
Artur

PCollection<Claim2> claimInput = pipeline

        .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class))
                .withoutMetadata())
        .apply(Values.<String> create())
        .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
        .apply(Window.<Claim2>
into(FixedWindows.of(Duration.standardSeconds(100)))

.triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(1)));

 /**JOIN**************/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
 PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
 //PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies);

PCollection<KV<Integer,String>> merged=
collections.apply(Flatten.<KV<Integer,String>>pCollections());

Re: IllegalStateException when combining 3 streams?

Posted by Artur Mrozowski <ar...@gmail.com>.
Yes, indeed. It works just fine with equally define triggers in global
window. Sorry about that and thank you sooooo much.
I can now leav the valley of despair and continue on my learning path;)

Best Regards
/Artur

On Sat, Nov 4, 2017 at 10:40 PM, Aleksandr <al...@gmail.com> wrote:

> Hello,
> You are using for policyInput Repeatedly.forever, for customerInput
> Repeatedly.forever and for claimInput your trigger is without Repeatedly.forever.
>
>
> Best regards
> Aleksandr Gortujev
>
>
> сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <ar...@gmail.com>:
>
>> And the error message:
>>
>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>> Flatten had incompatible triggers: Repeatedly.forever(
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:124)
>> at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.
>> java:102)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.values.PCollectionList.apply(
>> PCollectionList.java:182)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:124)
>> at org.apache.beam.sdk.transforms.join.CoGroupByKey.
>> expand(CoGroupByKey.java:74)
>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
>> at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(
>> KeyedPCollectionTuple.java:106)
>> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
>> at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(
>> CustomerStreamPipelineGlobal.java:220)
>>
>>
>> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <ar...@gmail.com> wrote:
>>
>>> Sure, here is the url to my repo https://github.com/afuyo/
>>> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
>>> CustomerStreamPipelineGlobal.java
>>>
>>> It is fairly simple and I do no grouping before. Just read from Kafka
>>> and then join. I followed your advice and the 3 way join works for pipeline
>>> with fixed windows. Both classes use the same join class. The logic is the
>>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>>> think it could be due to triggers. But I am ofcourse not sure:)
>>>
>>> /Artur
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>   Trigger trigger1 =
>>>           AfterProcessingTime
>>>                   .pastFirstElementInPane()
>>>                   .plusDelayOf(Duration.standardSeconds(10))
>>>           ;
>>>   /**PARSE CUSTOMER*/
>>>   PCollection<Customer3> customerInput = pipeline
>>>
>>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>   .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>       trigger1 ))
>>>           .accumulatingFiredPanes())
>>>           ;
>>>   /**PARSE POLICY*/
>>>   PCollection<Policy2> policyInput = pipeline
>>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   // .withWatermarkFn(new AddWatermarkFn())
>>>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           //.apply(ParseJsons.of(Customer.class));
>>>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>          .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>                   trigger1))
>>>                   .accumulatingFiredPanes())
>>>           ;
>>>  /**PARSE CLAIM**/
>>>   PCollection<Claim2> claimInput = pipeline
>>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                   .withKeyDeserializer(StringDeserializer.class)
>>>                   .withValueDeserializer(StringDeserializer.class))
>>>                   .withoutMetadata())
>>>           .apply(Values.<String> create())
>>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>           .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>                   .accumulatingFiredPanes())
>>>           ;
>>>
>>>   /**CUSTOMER  ********/
>>>   PCollection<KV<Integer,String>> all_customers = customerInput
>>>           .apply(new ExtractAndMapCustomerKey())
>>>           ;
>>>   /***POLICY********/
>>>   PCollection<KV<Integer,String>> all_policies = policyInput
>>>           .apply(new ExtractAndMapPolicyKey())
>>>           ;
>>>   /***CLAIM*******/
>>>   PCollection<KV<Integer,String>> all_claims = claimInput
>>>           .apply(new ExtractAndMapClaimKey())
>>>           ;
>>>   /**JOIN**************/
>>>   /**This join works if I comment out the subsequent join**/
>>> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>
>>> /**This causes an exception**/
>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin3Way(all_customers,all_policies,all_claims);
>>>
>>>   /**this join will cause IllegalStateException **/
>>>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>
>>>    /**This will also cause an exception when used with 3 collections**/
>>>   //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>   //PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>
>>> And the 3 way join
>>>
>>> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>>>            final PCollection<KV<K, V1>> leftCollection,
>>>            final PCollection<KV<K, V2>> rightCollection
>>>            ,final PCollection<KV<K, V3>> thirdCollection)
>>> {
>>>
>>>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>
>>>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>                        .and(v2Tuple, rightCollection)
>>>                        .and(v3Tuple,thirdCollection)
>>>                        .apply(CoGroupByKey.<K>create());
>>>
>>>        System.out.println(coGbkResultCollection);
>>>
>>>        return coGbkResultCollection.apply(ParDo.of(
>>>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>>>
>>>                    @ProcessElement
>>>                    public void processElement(ProcessContext c) {
>>>                        KV<K, CoGbkResult> e = c.element();
>>>
>>>                        Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
>>>                        Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
>>>                        Iterable<V3> thirdValuesIterable = e.getValue().getAll(v3Tuple);
>>>
>>>                        for(V3 thirdValue : thirdValuesIterable)
>>>                        {
>>>
>>>                        }
>>>
>>>                        for (V1 leftValue : leftValuesIterable) {
>>>                            for (V2 rightValue : rightValuesIterable) {
>>>                                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
>>>                            }
>>>
>>>                        }
>>>                    }
>>>                }))
>>>                .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
>>>                        KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
>>>                                ((KvCoder) rightCollection.getCoder()).getValueCoder())));
>>>
>>>
>>> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bc...@google.com>
>>> wrote:
>>>
>>>> Can you share the program using global windows and make sure the
>>>> exception is the same? Basically, this kind of problem has to do with
>>>> whether you have applied a grouping operation already. The triggering (and
>>>> sometimes windowing) before a grouping operation is different than after.
>>>> So, if you are having problems applying a groupbykey somewhere and think
>>>> the windowing and triggering should be the same, you need to look before
>>>> that to see if one of the collections has been grouped but the others
>>>> haven't.
>>>>
>>>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <ar...@gmail.com> wrote:
>>>>
>>>>> Hej Ben,
>>>>> thank you for your answer. I think I forgot to mention that the join
>>>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>>>> PCollections with fixed windows and provides the output this time, even
>>>>> when using early and late triggers.That is great!
>>>>>
>>>>> But when I try to use it  with global windows for all three
>>>>> collections I get the same exception.
>>>>>
>>>>> Is it not possible to make a 3 way join in global window? The reason
>>>>> why I want to use global window is that I want to have all the historic
>>>>> records available in these collections. Not sure how that could be achieved
>>>>> using fixed windows.
>>>>>
>>>>> /Artur
>>>>>
>>>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
>>>>>         final PCollection<KV<K, V1>> leftCollection,
>>>>>         final PCollection<KV<K, V2>> rightCollection
>>>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>>>> {
>>>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>>>
>>>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>>>                     .and(v2Tuple, rightCollection)
>>>>>                     .and(v3Tuple,thirdCollection)
>>>>>                     .apply(CoGroupByKey.<K>create());
>>>>>
>>>>>
>>>>>     return coGbkResultCollection.apply(ParDo.of(
>>>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>>>
>>>>>                 @ProcessElement
>>>>>                 public void processElement(ProcessContext c)
>>>>>
>>>>>
>>>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> It looks like this is a problematic interaction between the 2-layer
>>>>>> join and the (unfortunately complicated) continuation triggering.
>>>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, so
>>>>>> it isn't possible to join that with C.
>>>>>>
>>>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>>>> the same time, which should have two benefits. First, it will work since
>>>>>> you won't be trying to merge a continuation trigger with the original
>>>>>> trigger. Second, it should be more efficient, because you are performing a
>>>>>> single, three-way join instead of two, two-way joins.
>>>>>>
>>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>>>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>>>>> information.
>>>>>>
>>>>>> -- Ben
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>>>
>>>>>>> So there are two scenarios that I've been trying out but operation
>>>>>>> is always the same, join setA+setB =>setAB, and then join
>>>>>>> setAB+setC=>setABC.
>>>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>>>>> join AB+C i get the exception.
>>>>>>> . Here is the code snippet where the window and triggers are the
>>>>>>> same for all three PCollections using global windows:
>>>>>>>
>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>
>>>>>>>  Trigger trigger1 =
>>>>>>>          AfterProcessingTime
>>>>>>>                  .pastFirstElementInPane()
>>>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>>>          ;
>>>>>>>  /**PARSE CUSTOMER*/
>>>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>>>
>>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>>>>>
>>>>>>>
>>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>>      trigger1 ))
>>>>>>>          .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>>  /**PARSE POLICY*/
>>>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>>                  trigger1))
>>>>>>>                  .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>> /**PARSE CLAIM**/
>>>>>>>
>>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>
>>>>>>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>>>>>                  .accumulatingFiredPanes())
>>>>>>>          ;
>>>>>>>
>>>>>>>  /**CUSTOMER  ********/
>>>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>>>          ;
>>>>>>>  /***POLICY********/
>>>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>>>          ;
>>>>>>>  /***CLAIM*******/
>>>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>>>          ;
>>>>>>>  /**JOIN**************/
>>>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>>>
>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>>
>>>>>>>  /**this join will cause IllegalStateException **/
>>>>>>>
>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>>>> above. Even in this case triggering is equal for all three collections like
>>>>>>> this
>>>>>>>
>>>>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>
>>>>>>> /**JOIN**************/
>>>>>>> /**NO ERROR this time **/
>>>>>>>
>>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>
>>>>>>>
>>>>>>> this time I get no errors but I am not able to print the results in
>>>>>>> the console.
>>>>>>>
>>>>>>> So, I make another experiment and again define equal trigger for all
>>>>>>> three collections. I can print the output to the console for the first two
>>>>>>> PCollections but the second join again fails with illegalStateException.
>>>>>>> Triggering definition for all three collections:
>>>>>>>
>>>>>>> /**PARSE CLAIM**/
>>>>>>>
>>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>>
>>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>>                  .withoutMetadata())
>>>>>>>          .apply(Values.<String> create())
>>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>
>>>>>>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>>>                                  //early update frequency
>>>>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>>>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>>>                  .accumulatingFiredPanes());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> In this cases I've added early and late firings which emits results
>>>>>>> from the pane but again throws exception on the second join.
>>>>>>>
>>>>>>> I know it's a lot of information to take in but basically if you
>>>>>>> have an example where you join three PCollections in global and in fixed
>>>>>>> windows with appropriate triggering, I'd be eternally grateful:)
>>>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Artur
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Artur,
>>>>>>>>
>>>>>>>> When you join the PCollections, they will be flattened and go
>>>>>>>> through a GroupByKey together. Since the trigger governs when the
>>>>>>>> GroupByKey can emit output, the triggers have to be equal or the GroupByKey
>>>>>>>> doesn't have a clear guide as to when it should output. If you can make the
>>>>>>>> triggering on all the input collections equal, that will resolve this
>>>>>>>> issue. If you still need different triggering elsewhere, that is fine. You
>>>>>>>> just need to make them the same going in to the join.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>>>> with triggers as it was before flattening.
>>>>>>>>>
>>>>>>>>> Best regards
>>>>>>>>> Aleksandr Gortujev
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>>>>> artmro@gmail.com>:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>>>
>>>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>>>
>>>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>>>> and triggers. And that is where I fear we hit the limitations  for what can
>>>>>>>>> be done.
>>>>>>>>>
>>>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>>>> combinations, but can't make it work.
>>>>>>>>>
>>>>>>>>> Exception in thread "main" java.lang.IllegalStateException:
>>>>>>>>> Inputs to Flatten had incompatible triggers: Repeatedly.forever(
>>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>>> seconds))
>>>>>>>>>
>>>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>>>> collections and print output in the console. When I add the third it runs
>>>>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>>>>> above is not longer an issue.
>>>>>>>>>
>>>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>>>>> global or fixed window respectively?
>>>>>>>>>
>>>>>>>>> Below code snippets from fixed windows case. Windows are defined
>>>>>>>>> in the same manner for all three collections, customer, claim and policy. The
>>>>>>>>> Join class I use comes from https://github.com/
>>>>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>>>> knowledge.
>>>>>>>>>
>>>>>>>>> Best Regard
>>>>>>>>> Artur
>>>>>>>>>
>>>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>>>
>>>>>>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>>                 .withoutMetadata())
>>>>>>>>>         .apply(Values.<String> create())
>>>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>>>
>>>>>>>>>  /**JOIN**************/
>>>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>>>>>>
>>>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>

Re: IllegalStateException when combining 3 streams?

Posted by Aleksandr <al...@gmail.com>.
Hello,
You are using for policyInput Repeatedly.forever, for customerInput
Repeatedly.forever and for claimInput your trigger is without
Repeatedly.forever.


Best regards
Aleksandr Gortujev


сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <ar...@gmail.com>:

> And the error message:
>
> Exception in thread "main" java.lang.IllegalStateException: Inputs to
> Flatten had incompatible triggers:
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds)
> at
> org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
> at
> org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
> at
> org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
> at
> org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
> at
> org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
> at
> org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
> at
> com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)
>
>
> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <ar...@gmail.com> wrote:
>
>> Sure, here is the url to my repo
>> https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java
>>
>>
>> It is fairly simple and I do no grouping before. Just read from Kafka and
>> then join. I followed your advice and the 3 way join works for pipeline
>> with fixed windows. Both classes use the same join class. The logic is the
>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>> think it could be due to triggers. But I am ofcourse not sure:)
>>
>> /Artur
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>   Trigger trigger1 =
>>           AfterProcessingTime
>>                   .pastFirstElementInPane()
>>                   .plusDelayOf(Duration.standardSeconds(10))
>>           ;
>>   /**PARSE CUSTOMER*/
>>   PCollection<Customer3> customerInput = pipeline
>>
>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>   .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>       trigger1 ))
>>           .accumulatingFiredPanes())
>>           ;
>>   /**PARSE POLICY*/
>>   PCollection<Policy2> policyInput = pipeline
>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   // .withWatermarkFn(new AddWatermarkFn())
>>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           //.apply(ParseJsons.of(Customer.class));
>>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>          .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>                   trigger1))
>>                   .accumulatingFiredPanes())
>>           ;
>>  /**PARSE CLAIM**/
>>   PCollection<Claim2> claimInput = pipeline
>>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>           .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>                   .accumulatingFiredPanes())
>>           ;
>>
>>   /**CUSTOMER  ********/
>>   PCollection<KV<Integer,String>> all_customers = customerInput
>>           .apply(new ExtractAndMapCustomerKey())
>>           ;
>>   /***POLICY********/
>>   PCollection<KV<Integer,String>> all_policies = policyInput
>>           .apply(new ExtractAndMapPolicyKey())
>>           ;
>>   /***CLAIM*******/
>>   PCollection<KV<Integer,String>> all_claims = claimInput
>>           .apply(new ExtractAndMapClaimKey())
>>           ;
>>   /**JOIN**************/
>>   /**This join works if I comment out the subsequent join**/
>> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>
>> /**This causes an exception**/
>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin3Way(all_customers,all_policies,all_claims);
>>
>>   /**this join will cause IllegalStateException **/
>>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>    /**This will also cause an exception when used with 3 collections**/
>>   //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>   //PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>
>> And the 3 way join
>>
>> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>>            final PCollection<KV<K, V1>> leftCollection,
>>            final PCollection<KV<K, V2>> rightCollection
>>            ,final PCollection<KV<K, V3>> thirdCollection)
>> {
>>
>>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>>
>>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>                        .and(v2Tuple, rightCollection)
>>                        .and(v3Tuple,thirdCollection)
>>                        .apply(CoGroupByKey.<K>create());
>>
>>        System.out.println(coGbkResultCollection);
>>
>>        return coGbkResultCollection.apply(ParDo.of(
>>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>>
>>                    @ProcessElement
>>                    public void processElement(ProcessContext c) {
>>                        KV<K, CoGbkResult> e = c.element();
>>
>>                        Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
>>                        Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
>>                        Iterable<V3> thirdValuesIterable = e.getValue().getAll(v3Tuple);
>>
>>                        for(V3 thirdValue : thirdValuesIterable)
>>                        {
>>
>>                        }
>>
>>                        for (V1 leftValue : leftValuesIterable) {
>>                            for (V2 rightValue : rightValuesIterable) {
>>                                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
>>                            }
>>
>>                        }
>>                    }
>>                }))
>>                .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
>>                        KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
>>                                ((KvCoder) rightCollection.getCoder()).getValueCoder())));
>>
>>
>> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bc...@google.com>
>> wrote:
>>
>>> Can you share the program using global windows and make sure the
>>> exception is the same? Basically, this kind of problem has to do with
>>> whether you have applied a grouping operation already. The triggering (and
>>> sometimes windowing) before a grouping operation is different than after.
>>> So, if you are having problems applying a groupbykey somewhere and think
>>> the windowing and triggering should be the same, you need to look before
>>> that to see if one of the collections has been grouped but the others
>>> haven't.
>>>
>>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <ar...@gmail.com> wrote:
>>>
>>>> Hej Ben,
>>>> thank you for your answer. I think I forgot to mention that the join
>>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>>> PCollections with fixed windows and provides the output this time, even
>>>> when using early and late triggers.That is great!
>>>>
>>>> But when I try to use it  with global windows for all three collections
>>>> I get the same exception.
>>>>
>>>> Is it not possible to make a 3 way join in global window? The reason
>>>> why I want to use global window is that I want to have all the historic
>>>> records available in these collections. Not sure how that could be achieved
>>>> using fixed windows.
>>>>
>>>> /Artur
>>>>
>>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
>>>>         final PCollection<KV<K, V1>> leftCollection,
>>>>         final PCollection<KV<K, V2>> rightCollection
>>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>>> {
>>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>>
>>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>>                     .and(v2Tuple, rightCollection)
>>>>                     .and(v3Tuple,thirdCollection)
>>>>                     .apply(CoGroupByKey.<K>create());
>>>>
>>>>
>>>>     return coGbkResultCollection.apply(ParDo.of(
>>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>>
>>>>                 @ProcessElement
>>>>                 public void processElement(ProcessContext c)
>>>>
>>>>
>>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org>
>>>> wrote:
>>>>
>>>>> It looks like this is a problematic interaction between the 2-layer
>>>>> join and the (unfortunately complicated) continuation triggering.
>>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, so
>>>>> it isn't possible to join that with C.
>>>>>
>>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>>> the same time, which should have two benefits. First, it will work since
>>>>> you won't be trying to merge a continuation trigger with the original
>>>>> trigger. Second, it should be more efficient, because you are performing a
>>>>> single, three-way join instead of two, two-way joins.
>>>>>
>>>>>
>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for
>>>>> more information.
>>>>>
>>>>> -- Ben
>>>>>
>>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>>
>>>>>> So there are two scenarios that I've been trying out but operation is
>>>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>>>> join AB+C i get the exception.
>>>>>> . Here is the code snippet where the window and triggers are the same
>>>>>> for all three PCollections using global windows:
>>>>>>
>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>
>>>>>>  Trigger trigger1 =
>>>>>>          AfterProcessingTime
>>>>>>                  .pastFirstElementInPane()
>>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>>          ;
>>>>>>  /**PARSE CUSTOMER*/
>>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>>
>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>>>>
>>>>>>
>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>      trigger1 ))
>>>>>>          .accumulatingFiredPanes())
>>>>>>          ;
>>>>>>  /**PARSE POLICY*/
>>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>                  trigger1))
>>>>>>                  .accumulatingFiredPanes())
>>>>>>          ;
>>>>>> /**PARSE CLAIM**/
>>>>>>
>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>
>>>>>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>>>>                  .accumulatingFiredPanes())
>>>>>>          ;
>>>>>>
>>>>>>  /**CUSTOMER  ********/
>>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>>          ;
>>>>>>  /***POLICY********/
>>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>>          ;
>>>>>>  /***CLAIM*******/
>>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>>          ;
>>>>>>  /**JOIN**************/
>>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>>
>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>
>>>>>>  /**this join will cause IllegalStateException **/
>>>>>>
>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>
>>>>>>
>>>>>>
>>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>>> above. Even in this case triggering is equal for all three collections like
>>>>>> this
>>>>>>
>>>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>
>>>>>> /**JOIN**************/
>>>>>> /**NO ERROR this time **/
>>>>>>
>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>
>>>>>>
>>>>>> this time I get no errors but I am not able to print the results in
>>>>>> the console.
>>>>>>
>>>>>> So, I make another experiment and again define equal trigger for all
>>>>>> three collections. I can print the output to the console for the first two
>>>>>> PCollections but the second join again fails with illegalStateException.
>>>>>> Triggering definition for all three collections:
>>>>>>
>>>>>> /**PARSE CLAIM**/
>>>>>>
>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>
>>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>
>>>>>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>>                                  //early update frequency
>>>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>>                  .accumulatingFiredPanes());
>>>>>>
>>>>>>
>>>>>>
>>>>>> In this cases I've added early and late firings which emits results
>>>>>> from the pane but again throws exception on the second join.
>>>>>>
>>>>>> I know it's a lot of information to take in but basically if you have
>>>>>> an example where you join three PCollections in global and in fixed windows
>>>>>> with appropriate triggering, I'd be eternally grateful:)
>>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>>
>>>>>> Best Regards
>>>>>> Artur
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Artur,
>>>>>>>
>>>>>>> When you join the PCollections, they will be flattened and go
>>>>>>> through a GroupByKey together. Since the trigger governs when the
>>>>>>> GroupByKey can emit output, the triggers have to be equal or the GroupByKey
>>>>>>> doesn't have a clear guide as to when it should output. If you can make the
>>>>>>> triggering on all the input collections equal, that will resolve this
>>>>>>> issue. If you still need different triggering elsewhere, that is fine. You
>>>>>>> just need to make them the same going in to the join.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>>> with triggers as it was before flattening.
>>>>>>>>
>>>>>>>> Best regards
>>>>>>>> Aleksandr Gortujev
>>>>>>>>
>>>>>>>>
>>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>>>> artmro@gmail.com>:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>>
>>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>>
>>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>>> and triggers. And that is where I fear we hit the limitations  for what can
>>>>>>>> be done.
>>>>>>>>
>>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>>> combinations, but can't make it work.
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>>>> to Flatten had incompatible triggers:
>>>>>>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>> seconds))
>>>>>>>>
>>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>>> collections and print output in the console. When I add the third it runs
>>>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>>>> above is not longer an issue.
>>>>>>>>
>>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>>>> global or fixed window respectively?
>>>>>>>>
>>>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>>>> the same manner for all three collections, customer, claim and policy. The
>>>>>>>> Join class I use comes from
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>>
>>>>>>>>
>>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>>> knowledge.
>>>>>>>>
>>>>>>>> Best Regard
>>>>>>>> Artur
>>>>>>>>
>>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>>
>>>>>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>                 .withoutMetadata())
>>>>>>>>         .apply(Values.<String> create())
>>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>>
>>>>>>>>  /**JOIN**************/
>>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>>>>>
>>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>
>

Re: IllegalStateException when combining 3 streams?

Posted by Artur Mrozowski <ar...@gmail.com>.
And the error message:

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
at
com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)


On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <ar...@gmail.com> wrote:

> Sure, here is the url to my repo https://github.com/afuyo/
> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
> CustomerStreamPipelineGlobal.java
>
> It is fairly simple and I do no grouping before. Just read from Kafka and
> then join. I followed your advice and the 3 way join works for pipeline
> with fixed windows. Both classes use the same join class. The logic is the
> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
> think it could be due to triggers. But I am ofcourse not sure:)
>
> /Artur
>
> Pipeline pipeline = Pipeline.create(options);
>
>   Trigger trigger1 =
>           AfterProcessingTime
>                   .pastFirstElementInPane()
>                   .plusDelayOf(Duration.standardSeconds(10))
>           ;
>   /**PARSE CUSTOMER*/
>   PCollection<Customer3> customerInput = pipeline
>
>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>   .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>       trigger1 ))
>           .accumulatingFiredPanes())
>           ;
>   /**PARSE POLICY*/
>   PCollection<Policy2> policyInput = pipeline
>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   // .withWatermarkFn(new AddWatermarkFn())
>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           //.apply(ParseJsons.of(Customer.class));
>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>          .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>                   trigger1))
>                   .accumulatingFiredPanes())
>           ;
>  /**PARSE CLAIM**/
>   PCollection<Claim2> claimInput = pipeline
>           .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>                   .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>           .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>                   .accumulatingFiredPanes())
>           ;
>
>   /**CUSTOMER  ********/
>   PCollection<KV<Integer,String>> all_customers = customerInput
>           .apply(new ExtractAndMapCustomerKey())
>           ;
>   /***POLICY********/
>   PCollection<KV<Integer,String>> all_policies = policyInput
>           .apply(new ExtractAndMapPolicyKey())
>           ;
>   /***CLAIM*******/
>   PCollection<KV<Integer,String>> all_claims = claimInput
>           .apply(new ExtractAndMapClaimKey())
>           ;
>   /**JOIN**************/
>   /**This join works if I comment out the subsequent join**/
> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>
> /**This causes an exception**/
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin3Way(all_customers,all_policies,all_claims);
>
>   /**this join will cause IllegalStateException **/
>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>    /**This will also cause an exception when used with 3 collections**/
>   //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>   //PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<Integer,String>>pCollections());
>
> And the 3 way join
>
> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>            final PCollection<KV<K, V1>> leftCollection,
>            final PCollection<KV<K, V2>> rightCollection
>            ,final PCollection<KV<K, V3>> thirdCollection)
> {
>
>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>
>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>                        .and(v2Tuple, rightCollection)
>                        .and(v3Tuple,thirdCollection)
>                        .apply(CoGroupByKey.<K>create());
>
>        System.out.println(coGbkResultCollection);
>
>        return coGbkResultCollection.apply(ParDo.of(
>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>
>                    @ProcessElement
>                    public void processElement(ProcessContext c) {
>                        KV<K, CoGbkResult> e = c.element();
>
>                        Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
>                        Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
>                        Iterable<V3> thirdValuesIterable = e.getValue().getAll(v3Tuple);
>
>                        for(V3 thirdValue : thirdValuesIterable)
>                        {
>
>                        }
>
>                        for (V1 leftValue : leftValuesIterable) {
>                            for (V2 rightValue : rightValuesIterable) {
>                                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
>                            }
>
>                        }
>                    }
>                }))
>                .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
>                        KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(),
>                                ((KvCoder) rightCollection.getCoder()).getValueCoder())));
>
>
> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bc...@google.com> wrote:
>
>> Can you share the program using global windows and make sure the
>> exception is the same? Basically, this kind of problem has to do with
>> whether you have applied a grouping operation already. The triggering (and
>> sometimes windowing) before a grouping operation is different than after.
>> So, if you are having problems applying a groupbykey somewhere and think
>> the windowing and triggering should be the same, you need to look before
>> that to see if one of the collections has been grouped but the others
>> haven't.
>>
>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <ar...@gmail.com> wrote:
>>
>>> Hej Ben,
>>> thank you for your answer. I think I forgot to mention that the join
>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>> PCollections with fixed windows and provides the output this time, even
>>> when using early and late triggers.That is great!
>>>
>>> But when I try to use it  with global windows for all three collections
>>> I get the same exception.
>>>
>>> Is it not possible to make a 3 way join in global window? The reason why
>>> I want to use global window is that I want to have all the historic records
>>> available in these collections. Not sure how that could be achieved using
>>> fixed windows.
>>>
>>> /Artur
>>>
>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
>>>         final PCollection<KV<K, V1>> leftCollection,
>>>         final PCollection<KV<K, V2>> rightCollection
>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>> {
>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>
>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>                     .and(v2Tuple, rightCollection)
>>>                     .and(v3Tuple,thirdCollection)
>>>                     .apply(CoGroupByKey.<K>create());
>>>
>>>
>>>     return coGbkResultCollection.apply(ParDo.of(
>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>
>>>                 @ProcessElement
>>>                 public void processElement(ProcessContext c)
>>>
>>>
>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org>
>>> wrote:
>>>
>>>> It looks like this is a problematic interaction between the 2-layer
>>>> join and the (unfortunately complicated) continuation triggering.
>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, so
>>>> it isn't possible to join that with C.
>>>>
>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>> the same time, which should have two benefits. First, it will work since
>>>> you won't be trying to merge a continuation trigger with the original
>>>> trigger. Second, it should be more efficient, because you are performing a
>>>> single, three-way join instead of two, two-way joins.
>>>>
>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>>>> org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>>> information.
>>>>
>>>> -- Ben
>>>>
>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>
>>>>> So there are two scenarios that I've been trying out but operation is
>>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>>> join AB+C i get the exception.
>>>>> . Here is the code snippet where the window and triggers are the same
>>>>> for all three PCollections using global windows:
>>>>>
>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>>  Trigger trigger1 =
>>>>>          AfterProcessingTime
>>>>>                  .pastFirstElementInPane()
>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>          ;
>>>>>  /**PARSE CUSTOMER*/
>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>
>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>>>
>>>>>
>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>      trigger1 ))
>>>>>          .accumulatingFiredPanes())
>>>>>          ;
>>>>>  /**PARSE POLICY*/
>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>>                  trigger1))
>>>>>                  .accumulatingFiredPanes())
>>>>>          ;
>>>>> /**PARSE CLAIM**/
>>>>>
>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>
>>>>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>>>                  .accumulatingFiredPanes())
>>>>>          ;
>>>>>
>>>>>  /**CUSTOMER  ********/
>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>          ;
>>>>>  /***POLICY********/
>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>          ;
>>>>>  /***CLAIM*******/
>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>          ;
>>>>>  /**JOIN**************/
>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>
>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>
>>>>>  /**this join will cause IllegalStateException **/
>>>>>
>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>
>>>>>
>>>>>
>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>> above. Even in this case triggering is equal for all three collections like
>>>>> this
>>>>>
>>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>
>>>>> /**JOIN**************/
>>>>> /**NO ERROR this time **/
>>>>>
>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>
>>>>>
>>>>> this time I get no errors but I am not able to print the results in
>>>>> the console.
>>>>>
>>>>> So, I make another experiment and again define equal trigger for all
>>>>> three collections. I can print the output to the console for the first two
>>>>> PCollections but the second join again fails with illegalStateException.
>>>>> Triggering definition for all three collections:
>>>>>
>>>>> /**PARSE CLAIM**/
>>>>>
>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>
>>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>
>>>>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>                                  //early update frequency
>>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>                  .accumulatingFiredPanes());
>>>>>
>>>>>
>>>>>
>>>>> In this cases I've added early and late firings which emits results
>>>>> from the pane but again throws exception on the second join.
>>>>>
>>>>> I know it's a lot of information to take in but basically if you have
>>>>> an example where you join three PCollections in global and in fixed windows
>>>>> with appropriate triggering, I'd be eternally grateful:)
>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>
>>>>> Best Regards
>>>>> Artur
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Artur,
>>>>>>
>>>>>> When you join the PCollections, they will be flattened and go through
>>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can
>>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't have a
>>>>>> clear guide as to when it should output. If you can make the triggering on
>>>>>> all the input collections equal, that will resolve this issue. If you still
>>>>>> need different triggering elsewhere, that is fine. You just need to make
>>>>>> them the same going in to the join.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>> with triggers as it was before flattening.
>>>>>>>
>>>>>>> Best regards
>>>>>>> Aleksandr Gortujev
>>>>>>>
>>>>>>>
>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>>> artmro@gmail.com>:
>>>>>>>
>>>>>>> Hi,
>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>
>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>
>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>> and triggers. And that is where I fear we hit the limitations  for what can
>>>>>>> be done.
>>>>>>>
>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>> combinations, but can't make it work.
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
>>>>>>> onizedProcessingTime.pastFirstElementInPane()),
>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>> seconds))
>>>>>>>
>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>> collections and print output in the console. When I add the third it runs
>>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>>> above is not longer an issue.
>>>>>>>
>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>>> global or fixed window respectively?
>>>>>>>
>>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>>> the same manner for all three collections, customer, claim and policy. The
>>>>>>> Join class I use comes from https://github.com/apache
>>>>>>> /beam/blob/master/sdks/java/extensions/join-library/src/
>>>>>>> main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>
>>>>>>>
>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>> knowledge.
>>>>>>>
>>>>>>> Best Regard
>>>>>>> Artur
>>>>>>>
>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>
>>>>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>                 .withoutMetadata())
>>>>>>>         .apply(Values.<String> create())
>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>
>>>>>>>  /**JOIN**************/
>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>>>>
>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: IllegalStateException when combining 3 streams?

Posted by Artur Mrozowski <ar...@gmail.com>.
Sure, here is the url to my repo
https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java


It is fairly simple and I do no grouping before. Just read from Kafka and
then join. I followed your advice and the 3 way join works for pipeline
with fixed windows. Both classes use the same join class. The logic is the
same and it always work for A+B joinc(CoGroupByKey) That's what makes me
think it could be due to triggers. But I am ofcourse not sure:)

/Artur

Pipeline pipeline = Pipeline.create(options);

  Trigger trigger1 =
          AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(10))
          ;
  /**PARSE CUSTOMER*/
  PCollection<Customer3> customerInput = pipeline

          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  .withoutMetadata())
          .apply(Values.<String> create())
          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
  .apply(Window.<Customer3> into(new
GlobalWindows()).triggering(Repeatedly.forever(
      trigger1 ))
          .accumulatingFiredPanes())
          ;
  /**PARSE POLICY*/
  PCollection<Policy2> policyInput = pipeline
          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  // .withWatermarkFn(new AddWatermarkFn())
                  //.withTimestampFn2(new AddCustomerTimestampFn())
                  .withoutMetadata())
          .apply(Values.<String> create())
          //.apply(ParseJsons.of(Customer.class));
          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
         .apply(Window.<Policy2> into(new
GlobalWindows()).triggering(Repeatedly.forever(
                  trigger1))
                  .accumulatingFiredPanes())
          ;
 /**PARSE CLAIM**/
  PCollection<Claim2> claimInput = pipeline
          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  .withoutMetadata())
          .apply(Values.<String> create())
          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
                  .accumulatingFiredPanes())
          ;

  /**CUSTOMER  ********/
  PCollection<KV<Integer,String>> all_customers = customerInput
          .apply(new ExtractAndMapCustomerKey())
          ;
  /***POLICY********/
  PCollection<KV<Integer,String>> all_policies = policyInput
          .apply(new ExtractAndMapPolicyKey())
          ;
  /***CLAIM*******/
  PCollection<KV<Integer,String>> all_claims = claimInput
          .apply(new ExtractAndMapClaimKey())
          ;
  /**JOIN**************/
  /**This join works if I comment out the subsequent join**/
// PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);

/**This causes an exception**/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin3Way(all_customers,all_policies,all_claims);

  /**this join will cause IllegalStateException **/
 // PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
   // Join.innerJoin(joinedCustomersAndPolicies,all_claims);

   /**This will also cause an exception when used with 3 collections**/
  //PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
  //PCollection<KV<Integer,String>> merged=
collections.apply(Flatten.<KV<Integer,String>>pCollections());

And the 3 way join

public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
           final PCollection<KV<K, V1>> leftCollection,
           final PCollection<KV<K, V2>> rightCollection
           ,final PCollection<KV<K, V3>> thirdCollection)
{

       final TupleTag<V1> v1Tuple = new TupleTag<>();
       final TupleTag<V2> v2Tuple = new TupleTag<>();
       final TupleTag<V3> v3Tuple = new TupleTag<>();

       PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
               KeyedPCollectionTuple.of(v1Tuple, leftCollection)
                       .and(v2Tuple, rightCollection)
                       .and(v3Tuple,thirdCollection)
                       .apply(CoGroupByKey.<K>create());

       System.out.println(coGbkResultCollection);

       return coGbkResultCollection.apply(ParDo.of(
               new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {

                   @ProcessElement
                   public void processElement(ProcessContext c) {
                       KV<K, CoGbkResult> e = c.element();

                       Iterable<V1> leftValuesIterable =
e.getValue().getAll(v1Tuple);
                       Iterable<V2> rightValuesIterable =
e.getValue().getAll(v2Tuple);
                       Iterable<V3> thirdValuesIterable =
e.getValue().getAll(v3Tuple);

                       for(V3 thirdValue : thirdValuesIterable)
                       {

                       }

                       for (V1 leftValue : leftValuesIterable) {
                           for (V2 rightValue : rightValuesIterable) {
                               c.output(KV.of(e.getKey(),
KV.of(leftValue, rightValue)));
                           }

                       }
                   }
               }))
               .setCoder(KvCoder.of(((KvCoder)
leftCollection.getCoder()).getKeyCoder(),
                       KvCoder.of(((KvCoder)
leftCollection.getCoder()).getValueCoder(),
                               ((KvCoder)
rightCollection.getCoder()).getValueCoder())));


On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <bc...@google.com> wrote:

> Can you share the program using global windows and make sure the exception
> is the same? Basically, this kind of problem has to do with whether you
> have applied a grouping operation already. The triggering (and sometimes
> windowing) before a grouping operation is different than after. So, if you
> are having problems applying a groupbykey somewhere and think the windowing
> and triggering should be the same, you need to look before that to see if
> one of the collections has been grouped but the others haven't.
>
> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <ar...@gmail.com> wrote:
>
>> Hej Ben,
>> thank you for your answer. I think I forgot to mention that the join
>> class already implements CoGroupByKey and comes from sdk extensions. I
>> haven now modified it slightly to do 3 way join(see below). It works for 3
>> PCollections with fixed windows and provides the output this time, even
>> when using early and late triggers.That is great!
>>
>> But when I try to use it  with global windows for all three collections I
>> get the same exception.
>>
>> Is it not possible to make a 3 way join in global window? The reason why
>> I want to use global window is that I want to have all the historic records
>> available in these collections. Not sure how that could be achieved using
>> fixed windows.
>>
>> /Artur
>>
>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
>>         final PCollection<KV<K, V1>> leftCollection,
>>         final PCollection<KV<K, V2>> rightCollection
>>         ,final PCollection<KV<K, V3>> thirdCollection)
>> {
>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>
>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>                     .and(v2Tuple, rightCollection)
>>                     .and(v3Tuple,thirdCollection)
>>                     .apply(CoGroupByKey.<K>create());
>>
>>
>>     return coGbkResultCollection.apply(ParDo.of(
>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>
>>                 @ProcessElement
>>                 public void processElement(ProcessContext c)
>>
>>
>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org>
>> wrote:
>>
>>> It looks like this is a problematic interaction between the 2-layer join
>>> and the (unfortunately complicated) continuation triggering. Specifically,
>>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>>> possible to join that with C.
>>>
>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>> CoGroupByKey. This will allow you to join all three input collections at
>>> the same time, which should have two benefits. First, it will work since
>>> you won't be trying to merge a continuation trigger with the original
>>> trigger. Second, it should be more efficient, because you are performing a
>>> single, three-way join instead of two, two-way joins.
>>>
>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>> information.
>>>
>>> -- Ben
>>>
>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com> wrote:
>>>
>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>
>>>> So there are two scenarios that I've been trying out but operation is
>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>> In scenario 1 I define three PCollections with global widnows and
>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>> join AB+C i get the exception.
>>>> . Here is the code snippet where the window and triggers are the same
>>>> for all three PCollections using global windows:
>>>>
>>>> Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>  Trigger trigger1 =
>>>>          AfterProcessingTime
>>>>                  .pastFirstElementInPane()
>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>          ;
>>>>  /**PARSE CUSTOMER*/
>>>>  PCollection<Customer3> customerInput = pipeline
>>>>
>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>>
>>>>
>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>      trigger1 ))
>>>>          .accumulatingFiredPanes())
>>>>          ;
>>>>  /**PARSE POLICY*/
>>>>  PCollection<Policy2> policyInput = pipeline
>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>>                  trigger1))
>>>>                  .accumulatingFiredPanes())
>>>>          ;
>>>> /**PARSE CLAIM**/
>>>>
>>>>  PCollection<Claim2> claimInput = pipeline
>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>>                  .accumulatingFiredPanes())
>>>>          ;
>>>>
>>>>  /**CUSTOMER  ********/
>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>          ;
>>>>  /***POLICY********/
>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>          ;
>>>>  /***CLAIM*******/
>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>          .apply(new ExtractAndMapClaimKey())
>>>>          ;
>>>>  /**JOIN**************/
>>>>  /**This join works if I comment out the subsequent join**/
>>>>
>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>
>>>>  /**this join will cause IllegalStateException **/
>>>>
>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>>
>>>> The second scenario is using fixed windows. The logic is the same as
>>>> above. Even in this case triggering is equal for all three collections like
>>>> this
>>>>
>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>> /**JOIN**************/
>>>> /**NO ERROR this time **/
>>>>
>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>> this time I get no errors but I am not able to print the results in the
>>>> console.
>>>>
>>>> So, I make another experiment and again define equal trigger for all
>>>> three collections. I can print the output to the console for the first two
>>>> PCollections but the second join again fails with illegalStateException.
>>>> Triggering definition for all three collections:
>>>>
>>>> /**PARSE CLAIM**/
>>>>
>>>>  PCollection<Claim2> claimInput = pipeline
>>>>
>>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                                  //early update frequency
>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>                  .accumulatingFiredPanes());
>>>>
>>>>
>>>>
>>>> In this cases I've added early and late firings which emits results
>>>> from the pane but again throws exception on the second join.
>>>>
>>>> I know it's a lot of information to take in but basically if you have
>>>> an example where you join three PCollections in global and in fixed windows
>>>> with appropriate triggering, I'd be eternally grateful:)
>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>
>>>> Best Regards
>>>> Artur
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>> Hi Artur,
>>>>>
>>>>> When you join the PCollections, they will be flattened and go through
>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can
>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't have a
>>>>> clear guide as to when it should output. If you can make the triggering on
>>>>> all the input collections equal, that will resolve this issue. If you still
>>>>> need different triggering elsewhere, that is fine. You just need to make
>>>>> them the same going in to the join.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> You can try put PCollection after flatten into same global window
>>>>>> with triggers as it was before flattening.
>>>>>>
>>>>>> Best regards
>>>>>> Aleksandr Gortujev
>>>>>>
>>>>>>
>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>> artmro@gmail.com>:
>>>>>>
>>>>>> Hi,
>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>
>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>
>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>> and triggers. And that is where I fear we hit the limitations  for what can
>>>>>> be done.
>>>>>>
>>>>>> In case A we run in global windows and we are able to combine two
>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>> combinations, but can't make it work.
>>>>>>
>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds))
>>>>>>
>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>> collections and print output in the console. When I add the third it runs
>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>> above is not longer an issue.
>>>>>>
>>>>>> Has anyone experience with joining three or more unbounded
>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>> global or fixed window respectively?
>>>>>>
>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>> the same manner for all three collections, customer, claim and policy. The
>>>>>> Join class I use comes from https://github.com/
>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>
>>>>>>
>>>>>> Would be really greateful if any of you would like to share your
>>>>>> knowledge.
>>>>>>
>>>>>> Best Regard
>>>>>> Artur
>>>>>>
>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>
>>>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>                 .withoutMetadata())
>>>>>>         .apply(Values.<String> create())
>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>
>>>>>>  /**JOIN**************/
>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>>>
>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Re: IllegalStateException when combining 3 streams?

Posted by Ben Chambers <bc...@google.com>.
Can you share the program using global windows and make sure the exception
is the same? Basically, this kind of problem has to do with whether you
have applied a grouping operation already. The triggering (and sometimes
windowing) before a grouping operation is different than after. So, if you
are having problems applying a groupbykey somewhere and think the windowing
and triggering should be the same, you need to look before that to see if
one of the collections has been grouped but the others haven't.

On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <ar...@gmail.com> wrote:

> Hej Ben,
> thank you for your answer. I think I forgot to mention that the join class
> already implements CoGroupByKey and comes from sdk extensions. I haven now
> modified it slightly to do 3 way join(see below). It works for 3
> PCollections with fixed windows and provides the output this time, even
> when using early and late triggers.That is great!
>
> But when I try to use it  with global windows for all three collections I
> get the same exception.
>
> Is it not possible to make a 3 way join in global window? The reason why I
> want to use global window is that I want to have all the historic records
> available in these collections. Not sure how that could be achieved using
> fixed windows.
>
> /Artur
>
> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
>         final PCollection<KV<K, V1>> leftCollection,
>         final PCollection<KV<K, V2>> rightCollection
>         ,final PCollection<KV<K, V3>> thirdCollection)
> {
>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>
>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>                     .and(v2Tuple, rightCollection)
>                     .and(v3Tuple,thirdCollection)
>                     .apply(CoGroupByKey.<K>create());
>
>
>     return coGbkResultCollection.apply(ParDo.of(
>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>
>                 @ProcessElement
>                 public void processElement(ProcessContext c)
>
>
> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org> wrote:
>
>> It looks like this is a problematic interaction between the 2-layer join
>> and the (unfortunately complicated) continuation triggering. Specifically,
>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>> possible to join that with C.
>>
>> Instead of trying to do Join(Join(A, B), C), consider using a
>> CoGroupByKey. This will allow you to join all three input collections at
>> the same time, which should have two benefits. First, it will work since
>> you won't be trying to merge a continuation trigger with the original
>> trigger. Second, it should be more efficient, because you are performing a
>> single, three-way join instead of two, two-way joins.
>>
>>
>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for
>> more information.
>>
>> -- Ben
>>
>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com> wrote:
>>
>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>
>>> So there are two scenarios that I've been trying out but operation is
>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>> In scenario 1 I define three PCollections with global widnows and
>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>> join AB+C i get the exception.
>>> . Here is the code snippet where the window and triggers are the same
>>> for all three PCollections using global windows:
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>>  Trigger trigger1 =
>>>          AfterProcessingTime
>>>                  .pastFirstElementInPane()
>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>          ;
>>>  /**PARSE CUSTOMER*/
>>>  PCollection<Customer3> customerInput = pipeline
>>>
>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>>
>>>
>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>                  .withValueDeserializer(StringDeserializer.class))
>>>                  .withoutMetadata())
>>>          .apply(Values.<String> create())
>>>
>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>      trigger1 ))
>>>          .accumulatingFiredPanes())
>>>          ;
>>>  /**PARSE POLICY*/
>>>  PCollection<Policy2> policyInput = pipeline
>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>                  .withValueDeserializer(StringDeserializer.class))
>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>                  .withoutMetadata())
>>>          .apply(Values.<String> create())
>>>          //.apply(ParseJsons.of(Customer.class));
>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>>                  trigger1))
>>>                  .accumulatingFiredPanes())
>>>          ;
>>> /**PARSE CLAIM**/
>>>
>>>  PCollection<Claim2> claimInput = pipeline
>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>                  .withValueDeserializer(StringDeserializer.class))
>>>                  .withoutMetadata())
>>>          .apply(Values.<String> create())
>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>
>>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>>                  .accumulatingFiredPanes())
>>>          ;
>>>
>>>  /**CUSTOMER  ********/
>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>          .apply(new ExtractAndMapCustomerKey())
>>>          ;
>>>  /***POLICY********/
>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>          .apply(new ExtractAndMapPolicyKey())
>>>          ;
>>>  /***CLAIM*******/
>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>          .apply(new ExtractAndMapClaimKey())
>>>          ;
>>>  /**JOIN**************/
>>>  /**This join works if I comment out the subsequent join**/
>>>
>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>
>>>  /**this join will cause IllegalStateException **/
>>>
>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>
>>>
>>>
>>> The second scenario is using fixed windows. The logic is the same as
>>> above. Even in this case triggering is equal for all three collections like
>>> this
>>>
>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>
>>> /**JOIN**************/
>>> /**NO ERROR this time **/
>>>
>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>
>>>
>>> this time I get no errors but I am not able to print the results in the
>>> console.
>>>
>>> So, I make another experiment and again define equal trigger for all
>>> three collections. I can print the output to the console for the first two
>>> PCollections but the second join again fails with illegalStateException.
>>> Triggering definition for all three collections:
>>>
>>> /**PARSE CLAIM**/
>>>
>>>  PCollection<Claim2> claimInput = pipeline
>>>
>>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>                  .withValueDeserializer(StringDeserializer.class))
>>>                  .withoutMetadata())
>>>          .apply(Values.<String> create())
>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>
>>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>                                  //early update frequency
>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>                  .accumulatingFiredPanes());
>>>
>>>
>>>
>>> In this cases I've added early and late firings which emits results from
>>> the pane but again throws exception on the second join.
>>>
>>> I know it's a lot of information to take in but basically if you have an
>>> example where you join three PCollections in global and in fixed windows
>>> with appropriate triggering, I'd be eternally grateful:)
>>> Or if you could explain how to do it, of course. thanks in advance.
>>>
>>> Best Regards
>>> Artur
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> Hi Artur,
>>>>
>>>> When you join the PCollections, they will be flattened and go through a
>>>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>>>> output, the triggers have to be equal or the GroupByKey doesn't have a
>>>> clear guide as to when it should output. If you can make the triggering on
>>>> all the input collections equal, that will resolve this issue. If you still
>>>> need different triggering elsewhere, that is fine. You just need to make
>>>> them the same going in to the join.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>> You can try put PCollection after flatten into same global window with
>>>>> triggers as it was before flattening.
>>>>>
>>>>> Best regards
>>>>> Aleksandr Gortujev
>>>>>
>>>>>
>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>> artmro@gmail.com>:
>>>>>
>>>>> Hi,
>>>>> I am on second week of our PoC with Beam and I am really amazed by the
>>>>> capabilities of the framework and how well engineered it is.
>>>>>
>>>>> Amazed does not mean experienced so please bear with me.
>>>>>
>>>>> What  we try to achieve is to join several streams using windowing and
>>>>> triggers. And that is where I fear we hit the limitations  for what can be
>>>>> done.
>>>>>
>>>>> In case A we run in global windows and we are able to combine two
>>>>> unbounded PCollections but when I try to combine the results with third
>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>> combinations, but can't make it work.
>>>>>
>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>>>> Flatten had incompatible triggers:
>>>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>> seconds))
>>>>>
>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>> collections and print output in the console. When I add the third it runs
>>>>> without errors, but I am not able to materialize results in the console.
>>>>> Although I am able to print results of merge using Flatten so the error
>>>>> above is not longer an issue.
>>>>>
>>>>> Has anyone experience with joining three or more unbounded
>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>> global or fixed window respectively?
>>>>>
>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>> the same manner for all three collections, customer, claim and policy. The
>>>>> Join class I use comes from
>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>
>>>>>
>>>>> Would be really greateful if any of you would like to share your
>>>>> knowledge.
>>>>>
>>>>> Best Regard
>>>>> Artur
>>>>>
>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>
>>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>                 .withoutMetadata())
>>>>>         .apply(Values.<String> create())
>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>
>>>>>  /**JOIN**************/
>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>>
>>>>> PCollection<KV<Integer,String>> merged=
>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: IllegalStateException when combining 3 streams?

Posted by Artur Mrozowski <ar...@gmail.com>.
Hej Ben,
thank you for your answer. I think I forgot to mention that the join class
already implements CoGroupByKey and comes from sdk extensions. I haven now
modified it slightly to do 3 way join(see below). It works for 3
PCollections with fixed windows and provides the output this time, even
when using early and late triggers.That is great!

But when I try to use it  with global windows for all three collections I
get the same exception.

Is it not possible to make a 3 way join in global window? The reason why I
want to use global window is that I want to have all the historic records
available in these collections. Not sure how that could be achieved using
fixed windows.

/Artur

public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> innerJoin3Way(
        final PCollection<KV<K, V1>> leftCollection,
        final PCollection<KV<K, V2>> rightCollection
        ,final PCollection<KV<K, V3>> thirdCollection)
{
    final TupleTag<V1> v1Tuple = new TupleTag<>();
    final TupleTag<V2> v2Tuple = new TupleTag<>();
    final TupleTag<V3> v3Tuple = new TupleTag<>();

    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
            KeyedPCollectionTuple.of(v1Tuple, leftCollection)
                    .and(v2Tuple, rightCollection)
                    .and(v3Tuple,thirdCollection)
                    .apply(CoGroupByKey.<K>create());


    return coGbkResultCollection.apply(ParDo.of(
            new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {

                @ProcessElement
                public void processElement(ProcessContext c)


On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <bc...@apache.org> wrote:

> It looks like this is a problematic interaction between the 2-layer join
> and the (unfortunately complicated) continuation triggering. Specifically,
> the triggering of Join(A, B) has the continuation trigger, so it isn't
> possible to join that with C.
>
> Instead of trying to do Join(Join(A, B), C), consider using a
> CoGroupByKey. This will allow you to join all three input collections at
> the same time, which should have two benefits. First, it will work since
> you won't be trying to merge a continuation trigger with the original
> trigger. Second, it should be more efficient, because you are performing a
> single, three-way join instead of two, two-way joins.
>
> https://beam.apache.org/documentation/sdks/javadoc/2.
> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
> information.
>
> -- Ben
>
> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com> wrote:
>
>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>
>> So there are two scenarios that I've been trying out but operation is
>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>> In scenario 1 I define three PCollections with global widnows and exactly
>> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
>> i get the exception.
>> . Here is the code snippet where the window and triggers are the same for
>> all three PCollections using global windows:
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>  Trigger trigger1 =
>>          AfterProcessingTime
>>                  .pastFirstElementInPane()
>>                  .plusDelayOf(Duration.standardSeconds(10))
>>          ;
>>  /**PARSE CUSTOMER*/
>>  PCollection<Customer3> customerInput = pipeline
>>
>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>>
>>
>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>      trigger1 ))
>>          .accumulatingFiredPanes())
>>          ;
>>  /**PARSE POLICY*/
>>  PCollection<Policy2> policyInput = pipeline
>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  // .withWatermarkFn(new AddWatermarkFn())
>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          //.apply(ParseJsons.of(Customer.class));
>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>>                  trigger1))
>>                  .accumulatingFiredPanes())
>>          ;
>> /**PARSE CLAIM**/
>>
>>  PCollection<Claim2> claimInput = pipeline
>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>>                  .accumulatingFiredPanes())
>>          ;
>>
>>  /**CUSTOMER  ********/
>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>          .apply(new ExtractAndMapCustomerKey())
>>          ;
>>  /***POLICY********/
>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>          .apply(new ExtractAndMapPolicyKey())
>>          ;
>>  /***CLAIM*******/
>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>          .apply(new ExtractAndMapClaimKey())
>>          ;
>>  /**JOIN**************/
>>  /**This join works if I comment out the subsequent join**/
>>
>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>
>>  /**this join will cause IllegalStateException **/
>>
>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>>
>> The second scenario is using fixed windows. The logic is the same as
>> above. Even in this case triggering is equal for all three collections like
>> this
>>
>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>
>> /**JOIN**************/
>> /**NO ERROR this time **/
>>
>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>
>> this time I get no errors but I am not able to print the results in the
>> console.
>>
>> So, I make another experiment and again define equal trigger for all
>> three collections. I can print the output to the console for the first two
>> PCollections but the second join again fails with illegalStateException.
>> Triggering definition for all three collections:
>>
>> /**PARSE CLAIM**/
>>
>>  PCollection<Claim2> claimInput = pipeline
>>
>>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                  .withKeyDeserializer(StringDeserializer.class)
>>                  .withValueDeserializer(StringDeserializer.class))
>>                  .withoutMetadata())
>>          .apply(Values.<String> create())
>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>
>>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>                                  //early update frequency
>>                                  .alignedTo(Duration.standardSeconds(10)))
>>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>                  .accumulatingFiredPanes());
>>
>>
>>
>> In this cases I've added early and late firings which emits results from
>> the pane but again throws exception on the second join.
>>
>> I know it's a lot of information to take in but basically if you have an
>> example where you join three PCollections in global and in fixed windows
>> with appropriate triggering, I'd be eternally grateful:)
>> Or if you could explain how to do it, of course. thanks in advance.
>>
>> Best Regards
>> Artur
>>
>>
>>
>>
>>
>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Hi Artur,
>>>
>>> When you join the PCollections, they will be flattened and go through a
>>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>>> output, the triggers have to be equal or the GroupByKey doesn't have a
>>> clear guide as to when it should output. If you can make the triggering on
>>> all the input collections equal, that will resolve this issue. If you still
>>> need different triggering elsewhere, that is fine. You just need to make
>>> them the same going in to the join.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>> You can try put PCollection after flatten into same global window with
>>>> triggers as it was before flattening.
>>>>
>>>> Best regards
>>>> Aleksandr Gortujev
>>>>
>>>>
>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>> artmro@gmail.com>:
>>>>
>>>> Hi,
>>>> I am on second week of our PoC with Beam and I am really amazed by the
>>>> capabilities of the framework and how well engineered it is.
>>>>
>>>> Amazed does not mean experienced so please bear with me.
>>>>
>>>> What  we try to achieve is to join several streams using windowing and
>>>> triggers. And that is where I fear we hit the limitations  for what can be
>>>> done.
>>>>
>>>> In case A we run in global windows and we are able to combine two
>>>> unbounded PCollections but when I try to combine the results with third
>>>> collection I get the exception below. I tried many diffrent trigger
>>>> combinations, but can't make it work.
>>>>
>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>>> Flatten had incompatible triggers: Repeatedly.forever(
>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>> seconds))
>>>>
>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>> collections and print output in the console. When I add the third it runs
>>>> without errors, but I am not able to materialize results in the console.
>>>> Although I am able to print results of merge using Flatten so the error
>>>> above is not longer an issue.
>>>>
>>>> Has anyone experience with joining three or more unbounded
>>>> PCollections? What would be successful windowing, triggering strategy for
>>>> global or fixed window respectively?
>>>>
>>>> Below code snippets from fixed windows case. Windows are defined in the
>>>> same manner for all three collections, customer, claim and policy. The
>>>> Join class I use comes from https://github.com/
>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>
>>>>
>>>> Would be really greateful if any of you would like to share your
>>>> knowledge.
>>>>
>>>> Best Regard
>>>> Artur
>>>>
>>>> PCollection<Claim2> claimInput = pipeline
>>>>
>>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>                 .withoutMetadata())
>>>>         .apply(Values.<String> create())
>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>>  /**JOIN**************/
>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>>
>>>> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
>>>> Integer,String>>pCollections());
>>>>
>>>>
>>>>
>>>
>>

Re: IllegalStateException when combining 3 streams?

Posted by Ben Chambers <bc...@apache.org>.
It looks like this is a problematic interaction between the 2-layer join
and the (unfortunately complicated) continuation triggering. Specifically,
the triggering of Join(A, B) has the continuation trigger, so it isn't
possible to join that with C.

Instead of trying to do Join(Join(A, B), C), consider using a CoGroupByKey.
This will allow you to join all three input collections at the same time,
which should have two benefits. First, it will work since you won't be
trying to merge a continuation trigger with the original trigger. Second,
it should be more efficient, because you are performing a single, three-way
join instead of two, two-way joins.

https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
for
more information.

-- Ben

On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <ar...@gmail.com> wrote:

> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>
> So there are two scenarios that I've been trying out but operation is
> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
> In scenario 1 I define three PCollections with global widnows and exactly
> the same triggers. Now I am able to join A+B but as soon I try to join AB+C
> i get the exception.
> . Here is the code snippet where the window and triggers are the same for
> all three PCollections using global windows:
>
> Pipeline pipeline = Pipeline.create(options);
>
>  Trigger trigger1 =
>          AfterProcessingTime
>                  .pastFirstElementInPane()
>                  .plusDelayOf(Duration.standardSeconds(10))
>          ;
>  /**PARSE CUSTOMER*/
>  PCollection<Customer3> customerInput = pipeline
>
>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(customerTopic))
>
>
>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>  .apply(Window.<Customer3> into(new GlobalWindows()).triggering(Repeatedly.forever(
>      trigger1 ))
>          .accumulatingFiredPanes())
>          ;
>  /**PARSE POLICY*/
>  PCollection<Policy2> policyInput = pipeline
>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(policyTopic))
>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  // .withWatermarkFn(new AddWatermarkFn())
>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          //.apply(ParseJsons.of(Customer.class));
>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>         .apply(Window.<Policy2> into(new GlobalWindows()).triggering(Repeatedly.forever(
>                  trigger1))
>                  .accumulatingFiredPanes())
>          ;
> /**PARSE CLAIM**/
>
>  PCollection<Claim2> claimInput = pipeline
>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>
>          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
>                  .accumulatingFiredPanes())
>          ;
>
>  /**CUSTOMER  ********/
>  PCollection<KV<Integer,String>> all_customers = customerInput
>          .apply(new ExtractAndMapCustomerKey())
>          ;
>  /***POLICY********/
>  PCollection<KV<Integer,String>> all_policies = policyInput
>          .apply(new ExtractAndMapPolicyKey())
>          ;
>  /***CLAIM*******/
>  PCollection<KV<Integer,String>> all_claims = claimInput
>          .apply(new ExtractAndMapClaimKey())
>          ;
>  /**JOIN**************/
>  /**This join works if I comment out the subsequent join**/
>
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>
>  /**this join will cause IllegalStateException **/
>
>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>
>
> The second scenario is using fixed windows. The logic is the same as
> above. Even in this case triggering is equal for all three collections like
> this
>
> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>         .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>         .withAllowedLateness(Duration.standardSeconds(1)));
>
> /**JOIN**************/
> /**NO ERROR this time **/
>
> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
> PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>
> this time I get no errors but I am not able to print the results in the
> console.
>
> So, I make another experiment and again define equal trigger for all three
> collections. I can print the output to the console for the first two
> PCollections but the second join again fails with illegalStateException.
> Triggering definition for all three collections:
>
> /**PARSE CLAIM**/
>
>  PCollection<Claim2> claimInput = pipeline
>
>          .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>                  .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                  .withKeyDeserializer(StringDeserializer.class)
>                  .withValueDeserializer(StringDeserializer.class))
>                  .withoutMetadata())
>          .apply(Values.<String> create())
>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>
>          .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(60)))
>                  .triggering(AfterWatermark.pastEndOfWindow()
>                          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>                                  //early update frequency
>                                  .alignedTo(Duration.standardSeconds(10)))
>                          .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>                  .withAllowedLateness(Duration.standardMinutes(5))
>                  .accumulatingFiredPanes());
>
>
>
> In this cases I've added early and late firings which emits results from
> the pane but again throws exception on the second join.
>
> I know it's a lot of information to take in but basically if you have an
> example where you join three PCollections in global and in fixed windows
> with appropriate triggering, I'd be eternally grateful:)
> Or if you could explain how to do it, of course. thanks in advance.
>
> Best Regards
> Artur
>
>
>
>
>
> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Hi Artur,
>>
>> When you join the PCollections, they will be flattened and go through a
>> GroupByKey together. Since the trigger governs when the GroupByKey can emit
>> output, the triggers have to be equal or the GroupByKey doesn't have a
>> clear guide as to when it should output. If you can make the triggering on
>> all the input collections equal, that will resolve this issue. If you still
>> need different triggering elsewhere, that is fine. You just need to make
>> them the same going in to the join.
>>
>> Kenn
>>
>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com> wrote:
>>
>>> Hello,
>>> You can try put PCollection after flatten into same global window with
>>> triggers as it was before flattening.
>>>
>>> Best regards
>>> Aleksandr Gortujev
>>>
>>>
>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>> artmro@gmail.com>:
>>>
>>> Hi,
>>> I am on second week of our PoC with Beam and I am really amazed by the
>>> capabilities of the framework and how well engineered it is.
>>>
>>> Amazed does not mean experienced so please bear with me.
>>>
>>> What  we try to achieve is to join several streams using windowing and
>>> triggers. And that is where I fear we hit the limitations  for what can be
>>> done.
>>>
>>> In case A we run in global windows and we are able to combine two
>>> unbounded PCollections but when I try to combine the results with third
>>> collection I get the exception below. I tried many diffrent trigger
>>> combinations, but can't make it work.
>>>
>>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>>> Flatten had incompatible triggers:
>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>> seconds))
>>>
>>> In case B I use fixed windows. Again, I can successfully  join two
>>> collections and print output in the console. When I add the third it runs
>>> without errors, but I am not able to materialize results in the console.
>>> Although I am able to print results of merge using Flatten so the error
>>> above is not longer an issue.
>>>
>>> Has anyone experience with joining three or more unbounded PCollections?
>>> What would be successful windowing, triggering strategy for global or fixed
>>> window respectively?
>>>
>>> Below code snippets from fixed windows case. Windows are defined in the
>>> same manner for all three collections, customer, claim and policy. The
>>> Join class I use comes from
>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>
>>>
>>> Would be really greateful if any of you would like to share your
>>> knowledge.
>>>
>>> Best Regard
>>> Artur
>>>
>>> PCollection<Claim2> claimInput = pipeline
>>>
>>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializer(StringDeserializer.class))
>>>                 .withoutMetadata())
>>>         .apply(Values.<String> create())
>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>
>>>  /**JOIN**************/
>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>>
>>> PCollection<KV<Integer,String>> merged=
>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>
>>>
>>>
>>
>

Re: IllegalStateException when combining 3 streams?

Posted by Artur Mrozowski <ar...@gmail.com>.
Hi Kenneth and Aleksandr and thank you for your prompt answer.

So there are two scenarios that I've been trying out but operation is
always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
In scenario 1 I define three PCollections with global widnows and exactly
the same triggers. Now I am able to join A+B but as soon I try to join AB+C
i get the exception.
. Here is the code snippet where the window and triggers are the same for
all three PCollections using global windows:

Pipeline pipeline = Pipeline.create(options);

 Trigger trigger1 =
         AfterProcessingTime
                 .pastFirstElementInPane()
                 .plusDelayOf(Duration.standardSeconds(10))
         ;
 /**PARSE CUSTOMER*/
 PCollection<Customer3> customerInput = pipeline

         .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(StringDeserializer.class))
                 .withoutMetadata())
         .apply(Values.<String> create())
         .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
 .apply(Window.<Customer3> into(new
GlobalWindows()).triggering(Repeatedly.forever(
     trigger1 ))
         .accumulatingFiredPanes())
         ;
 /**PARSE POLICY*/
 PCollection<Policy2> policyInput = pipeline
         .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(StringDeserializer.class))
                 // .withWatermarkFn(new AddWatermarkFn())
                 //.withTimestampFn2(new AddCustomerTimestampFn())
                 .withoutMetadata())
         .apply(Values.<String> create())
         //.apply(ParseJsons.of(Customer.class));
         .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
        .apply(Window.<Policy2> into(new
GlobalWindows()).triggering(Repeatedly.forever(
                 trigger1))
                 .accumulatingFiredPanes())
         ;
/**PARSE CLAIM**/
 PCollection<Claim2> claimInput = pipeline
         .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(StringDeserializer.class))
                 .withoutMetadata())
         .apply(Values.<String> create())
         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
         .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
                 .accumulatingFiredPanes())
         ;

 /**CUSTOMER  ********/
 PCollection<KV<Integer,String>> all_customers = customerInput
         .apply(new ExtractAndMapCustomerKey())
         ;
 /***POLICY********/
 PCollection<KV<Integer,String>> all_policies = policyInput
         .apply(new ExtractAndMapPolicyKey())
         ;
 /***CLAIM*******/
 PCollection<KV<Integer,String>> all_claims = claimInput
         .apply(new ExtractAndMapClaimKey())
         ;
 /**JOIN**************/
 /**This join works if I comment out the subsequent join**/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
 /**this join will cause IllegalStateException **/
 PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
   Join.innerJoin(joinedCustomersAndPolicies,all_claims);



The second scenario is using fixed windows. The logic is the same as above.
Even in this case triggering is equal for all three collections like this

.apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
        .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardSeconds(1)));

/**JOIN**************/
/**NO ERROR this time **/
PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
  Join.innerJoin(joinedCustomersAndPolicies,all_claims);


this time I get no errors but I am not able to print the results in the
console.

So, I make another experiment and again define equal trigger for all three
collections. I can print the output to the console for the first two
PCollections but the second join again fails with illegalStateException.
Triggering definition for all three collections:

/**PARSE CLAIM**/
 PCollection<Claim2> claimInput = pipeline

         .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(StringDeserializer.class))
                 .withoutMetadata())
         .apply(Values.<String> create())
         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
         .apply(Window.<Claim2>
into(FixedWindows.of(Duration.standardSeconds(60)))
                 .triggering(AfterWatermark.pastEndOfWindow()

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                 //early update frequency
                                 .alignedTo(Duration.standardSeconds(10)))

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
                 .withAllowedLateness(Duration.standardMinutes(5))
                 .accumulatingFiredPanes());



In this cases I've added early and late firings which emits results from
the pane but again throws exception on the second join.

I know it's a lot of information to take in but basically if you have an
example where you join three PCollections in global and in fixed windows
with appropriate triggering, I'd be eternally grateful:)
Or if you could explain how to do it, of course. thanks in advance.

Best Regards
Artur





On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <kl...@google.com> wrote:

> Hi Artur,
>
> When you join the PCollections, they will be flattened and go through a
> GroupByKey together. Since the trigger governs when the GroupByKey can emit
> output, the triggers have to be equal or the GroupByKey doesn't have a
> clear guide as to when it should output. If you can make the triggering on
> all the input collections equal, that will resolve this issue. If you still
> need different triggering elsewhere, that is fine. You just need to make
> them the same going in to the join.
>
> Kenn
>
> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com> wrote:
>
>> Hello,
>> You can try put PCollection after flatten into same global window with
>> triggers as it was before flattening.
>>
>> Best regards
>> Aleksandr Gortujev
>>
>>
>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>> artmro@gmail.com>:
>>
>> Hi,
>> I am on second week of our PoC with Beam and I am really amazed by the
>> capabilities of the framework and how well engineered it is.
>>
>> Amazed does not mean experienced so please bear with me.
>>
>> What  we try to achieve is to join several streams using windowing and
>> triggers. And that is where I fear we hit the limitations  for what can be
>> done.
>>
>> In case A we run in global windows and we are able to combine two
>> unbounded PCollections but when I try to combine the results with third
>> collection I get the exception below. I tried many diffrent trigger
>> combinations, but can't make it work.
>>
>> Exception in thread "main" java.lang.IllegalStateException: Inputs to
>> Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
>> onizedProcessingTime.pastFirstElementInPane()),
>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>> seconds))
>>
>> In case B I use fixed windows. Again, I can successfully  join two
>> collections and print output in the console. When I add the third it runs
>> without errors, but I am not able to materialize results in the console.
>> Although I am able to print results of merge using Flatten so the error
>> above is not longer an issue.
>>
>> Has anyone experience with joining three or more unbounded PCollections?
>> What would be successful windowing, triggering strategy for global or fixed
>> window respectively?
>>
>> Below code snippets from fixed windows case. Windows are defined in the
>> same manner for all three collections, customer, claim and policy. The
>> Join class I use comes from https://github.com/apache
>> /beam/blob/master/sdks/java/extensions/join-library/src/main
>> /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>
>>
>> Would be really greateful if any of you would like to share your
>> knowledge.
>>
>> Best Regard
>> Artur
>>
>> PCollection<Claim2> claimInput = pipeline
>>
>>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                 .withKeyDeserializer(StringDeserializer.class)
>>                 .withValueDeserializer(StringDeserializer.class))
>>                 .withoutMetadata())
>>         .apply(Values.<String> create())
>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>
>>  /**JOIN**************/
>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>>
>> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
>> Integer,String>>pCollections());
>>
>>
>>
>

Re: IllegalStateException when combining 3 streams?

Posted by Kenneth Knowles <kl...@google.com>.
Hi Artur,

When you join the PCollections, they will be flattened and go through a
GroupByKey together. Since the trigger governs when the GroupByKey can emit
output, the triggers have to be equal or the GroupByKey doesn't have a
clear guide as to when it should output. If you can make the triggering on
all the input collections equal, that will resolve this issue. If you still
need different triggering elsewhere, that is fine. You just need to make
them the same going in to the join.

Kenn

On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <al...@gmail.com> wrote:

> Hello,
> You can try put PCollection after flatten into same global window with
> triggers as it was before flattening.
>
> Best regards
> Aleksandr Gortujev
>
>
> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
> artmro@gmail.com>:
>
> Hi,
> I am on second week of our PoC with Beam and I am really amazed by the
> capabilities of the framework and how well engineered it is.
>
> Amazed does not mean experienced so please bear with me.
>
> What  we try to achieve is to join several streams using windowing and
> triggers. And that is where I fear we hit the limitations  for what can be
> done.
>
> In case A we run in global windows and we are able to combine two
> unbounded PCollections but when I try to combine the results with third
> collection I get the exception below. I tried many diffrent trigger
> combinations, but can't make it work.
>
> Exception in thread "main" java.lang.IllegalStateException: Inputs to
> Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
> onizedProcessingTime.pastFirstElementInPane()),
> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds))
>
> In case B I use fixed windows. Again, I can successfully  join two
> collections and print output in the console. When I add the third it runs
> without errors, but I am not able to materialize results in the console.
> Although I am able to print results of merge using Flatten so the error
> above is not longer an issue.
>
> Has anyone experience with joining three or more unbounded PCollections?
> What would be successful windowing, triggering strategy for global or fixed
> window respectively?
>
> Below code snippets from fixed windows case. Windows are defined in the
> same manner for all three collections, customer, claim and policy. The
> Join class I use comes from https://github.com/apache
> /beam/blob/master/sdks/java/extensions/join-library/src/main
> /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>
>
> Would be really greateful if any of you would like to share your
> knowledge.
>
> Best Regard
> Artur
>
> PCollection<Claim2> claimInput = pipeline
>
>         .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic))
>                 .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class))
>                 .withoutMetadata())
>         .apply(Values.<String> create())
>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>         .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>                 .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardSeconds(1)));
>
>  /**JOIN**************/
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
> PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>  PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims =
>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>  //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies);
>
> PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
> Integer,String>>pCollections());
>
>
>

Re: IllegalStateException when combining 3 streams?

Posted by Aleksandr <al...@gmail.com>.
Hello,
You can try put PCollection after flatten into same global window with
triggers as it was before flattening.

Best regards
Aleksandr Gortujev


3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <artmro@gmail.com
>:

Hi,
I am on second week of our PoC with Beam and I am really amazed by the
capabilities of the framework and how well engineered it is.

Amazed does not mean experienced so please bear with me.

What  we try to achieve is to join several streams using windowing and
triggers. And that is where I fear we hit the limitations  for what can be
done.

In case A we run in global windows and we are able to combine two unbounded
PCollections but when I try to combine the results with third collection I
get the exception below. I tried many diffrent trigger combinations, but
can't make it work.

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers: Repeatedly.forever(
AfterSynchronizedProcessingTime.pastFirstElementInPane()),
AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds))

In case B I use fixed windows. Again, I can successfully  join two
collections and print output in the console. When I add the third it runs
without errors, but I am not able to materialize results in the console.
Although I am able to print results of merge using Flatten so the error
above is not longer an issue.

Has anyone experience with joining three or more unbounded PCollections?
What would be successful windowing, triggering strategy for global or fixed
window respectively?

Below code snippets from fixed windows case. Windows are defined in the
same manner for all three collections, customer, claim and policy. The Join
class I use comes from https://github.com/apache/beam/blob/master/sdks/java/
extensions/join-library/src/main/java/org/apache/beam/sdk/
extensions/joinlibrary/Join.java


Would be really greateful if any of you would like to share your knowledge.

Best Regard
Artur

PCollection<Claim2> claimInput = pipeline

        .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class))
                .withoutMetadata())
        .apply(Values.<String> create())
        .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
        .apply(Window.<Claim2>
into(FixedWindows.of(Duration.standardSeconds(100)))

.triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(1)));

 /**JOIN**************/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
 PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
 //PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies);

PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<
Integer,String>>pCollections());