You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Cuzmar <jc...@protonmail.com> on 2023/04/22 05:35:06 UTC

Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:

PCollection<String> result = p.
    apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
    .apply("Transform", ParDo.of(new MyTransformer()));

PCollection<Void> insert = result.apply("Inserting",
    JdbcIO.<String>writeVoid()
        .withDataSourceProviderFn(/*...*/)
        .withStatement(/*...*/)
        .withPreparedStatementSetter(/*...*/)
);

result.apply(Wait.on(insert))
    .apply("Selecting", new SomeTransform())
    .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
p.run();

In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.

I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.

Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?

You can find the sample code at: https://github.com/j1cs/app-beam

Thank you for your help and support.

Best regards,

Juan Cuzmar.

Re: Can some one please remove me from this mailing list

Posted by Bruno Volpato via user <us...@beam.apache.org>.
Hi Unais,

If you want to unsubscribe from this mailing list, you need to send a blank
email to user-unsubscribe@beam.apache.org.




On Sat, Apr 22, 2023 at 12:54 PM Unais T <tp...@gmail.com> wrote:

> Can some one please remove me from this mailing list

Can some one please remove me from this mailing list

Posted by Unais T <tp...@gmail.com>.
Can some one please remove me from this mailing list

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Juan Cuzmar <jc...@protonmail.com>.
I did the following test and it inserted data correctly, but when I try to pull the data it does not arrive.
        Pipeline p = Pipeline.create(options);

        Coder<String> utf8Coder = StringUtf8Coder.of();
        Coder<Map<String, String>> mapCoder = MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
        Coder<KV<String, Map<String, String>>> kvCoder = KvCoder.of(utf8Coder, mapCoder);

        TestStream<KV<String, Map<String, String>>> testStream = TestStream.create(kvCoder)
                .addElements(
                        TimestampedValue.of(KV.of("event0", Collections.singletonMap("test", "test")), new Instant(0 * 1000)),
                        TimestampedValue.of(KV.of("event1", Collections.singletonMap("test", "test")), new Instant(5 * 1000)),
                        TimestampedValue.of(KV.of("event2", Collections.singletonMap("test", "test")), new Instant(10 * 1000)),
                        TimestampedValue.of(KV.of("event3", Collections.singletonMap("test", "test")), new Instant(15 * 1000)))
                .advanceWatermarkTo(new Instant(20 * 1000))
                .advanceWatermarkToInfinity();
        PCollection<KV<String, Map<String, String>>> simulatedPubsubEvents = p.apply("TestStream", testStream);
        PCollection<String> result = p
                //.apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
                .apply("TestStream", testStream)
                //.apply("Transform", ParDo.of(new MyTransformer()))
                .apply("Transform", ParDo.of(new NewTransform()))
                .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                        .withAllowedLateness(Duration.standardMinutes(1))
                        .discardingFiredPanes());

        PCollection<Void> insert = result.apply("Inserting",
                JdbcIO.<String>write()
                        .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
                        .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
                        .withPreparedStatementSetter((element, preparedStatement) -> {
                            log.info("Preparing statement to insert");
                            preparedStatement.setString(1, element);
                        })
                        .withResults()
        );
        result.apply(Wait.on(insert))
                .apply("Selecting", new SomeTransform())
                .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
        p.run();
As far as I can see I have to develop with TestStream and in production use PubSub? 
is a very complicated approach in my opinion.

------- Original Message -------
On Saturday, April 22nd, 2023 at 1:39 PM, Reuven Lax via user <us...@beam.apache.org> wrote:


> Oh - in that case it's possible that the problem may be the direct runner's implementation of the pubsub source - especially the watermark. For a direct-runner test, I recommend using TestStream (which allows you to advance the watermark manually, so you can test windowing).
> 
> On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jc...@protonmail.com> wrote:
> 
> > I'm developing with direct runner. but should go to dataflow when deployed.
> > 
> > 
> > -------- Original Message --------
> > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
> > 
> > > 
> > > What runner are you using to run this pipeline?
> > > 
> > > On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jc...@protonmail.com> wrote:
> > > 
> > > > Same result:
> > > > PCollection<String> result = p
> > > > .apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
> > > > .apply("Transform", ParDo.of(new MyTransformer()))
> > > > .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
> > > > .triggering(AfterWatermark.pastEndOfWindow()
> > > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
> > > > .withAllowedLateness(Duration.standardMinutes(1))
> > > > .discardingFiredPanes());
> > > > 
> > > > PCollection<Void> insert = result.apply("Inserting",
> > > > JdbcIO.<String>write()
> > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
> > > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > > log.info("Preparing statement to insert");
> > > > preparedStatement.setString(1, element);
> > > > })
> > > > .withResults()
> > > > );
> > > > result.apply(Wait.on(insert))
> > > > .apply("Selecting", new SomeTransform())
> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > p.run();
> > > > 
> > > > updated the github repo as wqell.
> > > > 
> > > > ------- Original Message -------
> > > > On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
> > > > 
> > > > 
> > > > > The other problem you have here is that you have not set a window. Wait.on waits for the end of the current window before triggering. The default Window is the GlobalWindow, so as written Wait.on will wait for the end of time (or until you drain the pipeline, which will also trigger the GlobalWindow).
> > > > > Try adding a 1-minute fixed window to the results you read from PubSub.
> > > > >
> > > > > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com> wrote:
> > > > >
> > > > > > writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens
> > > > > >
> > > > > > PCollection<String> result = p.
> > > > > > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
> > > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > > >
> > > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > > JdbcIO.<String>write()
> > > > > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > > > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
> > > > > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > > > > log.info("Preparing statement to insert");
> > > > > > preparedStatement.setString(1, element);
> > > > > > })
> > > > > > .withResults()
> > > > > > );
> > > > > > result.apply(Wait.on(insert))
> > > > > > .apply("Selecting", new SomeTransform())
> > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > > > > >
> > > > > > ------- Original Message -------
> > > > > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
> > > > > >
> > > > > >
> > > > > > > I believe you have to call withResults() on the JdbcIO transform in order for this to work.
> > > > > > >
> > > > > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:
> > > > > > >
> > > > > > > > I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:
> > > > > > > >
> > > > > > > > PCollection<String> result = p.
> > > > > > > > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > > > > >
> > > > > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > > > > JdbcIO.<String>writeVoid()
> > > > > > > > .withDataSourceProviderFn(/*...*/)
> > > > > > > > .withStatement(/*...*/)
> > > > > > > > .withPreparedStatementSetter(/*...*/)
> > > > > > > > );
> > > > > > > >
> > > > > > > > result.apply(Wait.on(insert))
> > > > > > > > .apply("Selecting", new SomeTransform())
> > > > > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > > > > p.run();
> > > > > > > >
> > > > > > > > In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.
> > > > > > > >
> > > > > > > > I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.
> > > > > > > >
> > > > > > > > Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > > > > > > >
> > > > > > > > You can find the sample code at: https://github.com/j1cs/app-beam
> > > > > > > >
> > > > > > > > Thank you for your help and support.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Juan Cuzmar.

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Juan Cuzmar <jc...@protonmail.com>.
I see. if you don't mind could you give me an example? i am not very knowledgeable in apache beam.

-------- Original Message --------
On Apr 22, 2023, 13:39, Reuven Lax via user wrote:

> Oh - in that case it's possible that the problem may be the direct runner's implementation of the pubsub source - especially the watermark. For a direct-runner test, I recommend using TestStream (which allows you to advance the watermark manually, so you can test windowing).
>
> On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jc...@protonmail.com> wrote:
>
>> I'm developing with direct runner. but should go to dataflow when deployed.
>>
>> -------- Original Message --------
>> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>>
>>> What runner are you using to run this pipeline?
>>>
>>> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jc...@protonmail.com> wrote:
>>>
>>>> Same result:
>>>> PCollection<String> result = p
>>>> .apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
>>>> .apply("Transform", ParDo.of(new MyTransformer()))
>>>> .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>>>> .withAllowedLateness(Duration.standardMinutes(1))
>>>> .discardingFiredPanes());
>>>>
>>>> PCollection<Void> insert = result.apply("Inserting",
>>>> JdbcIO.<String>write()
>>>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>>> .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
>>>> .withPreparedStatementSetter((element, preparedStatement) -> {
>>>> log.info("Preparing statement to insert");
>>>> preparedStatement.setString(1, element);
>>>> })
>>>> .withResults()
>>>> );
>>>> result.apply(Wait.on(insert))
>>>> .apply("Selecting", new SomeTransform())
>>>> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>> p.run();
>>>>
>>>> updated the github repo as wqell.
>>>>
>>>> ------- Original Message -------
>>>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
>>>>
>>>>> The other problem you have here is that you have not set a window. Wait.on waits for the end of the current window before triggering. The default Window is the GlobalWindow, so as written Wait.on will wait for the end of time (or until you drain the pipeline, which will also trigger the GlobalWindow).
>>>>> Try adding a 1-minute fixed window to the results you read from PubSub.
>>>>>
>>>>> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com> wrote:
>>>>>
>>>>> > writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens
>>>>> >
>>>>> > PCollection<String> result = p.
>>>>> > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
>>>>> > .apply("Transform", ParDo.of(new MyTransformer()));
>>>>> >
>>>>> > PCollection<Void> insert = result.apply("Inserting",
>>>>> > JdbcIO.<String>write()
>>>>> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>>>> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
>>>>> > .withPreparedStatementSetter((element, preparedStatement) -> {
>>>>> > log.info("Preparing statement to insert");
>>>>> > preparedStatement.setString(1, element);
>>>>> > })
>>>>> > .withResults()
>>>>> > );
>>>>> > result.apply(Wait.on(insert))
>>>>> > .apply("Selecting", new SomeTransform())
>>>>> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>>> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>>>>> >
>>>>> > ------- Original Message -------
>>>>> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
>>>>> >
>>>>> >
>>>>> > > I believe you have to call withResults() on the JdbcIO transform in order for this to work.
>>>>> > >
>>>>> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:
>>>>> > >
>>>>> > > > I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:
>>>>> > > >
>>>>> > > > PCollection<String> result = p.
>>>>> > > > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>>>>> > > > .apply("Transform", ParDo.of(new MyTransformer()));
>>>>> > > >
>>>>> > > > PCollection<Void> insert = result.apply("Inserting",
>>>>> > > > JdbcIO.<String>writeVoid()
>>>>> > > > .withDataSourceProviderFn(/*...*/)
>>>>> > > > .withStatement(/*...*/)
>>>>> > > > .withPreparedStatementSetter(/*...*/)
>>>>> > > > );
>>>>> > > >
>>>>> > > > result.apply(Wait.on(insert))
>>>>> > > > .apply("Selecting", new SomeTransform())
>>>>> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>>>> > > > p.run();
>>>>> > > >
>>>>> > > > In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.
>>>>> > > >
>>>>> > > > I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.
>>>>> > > >
>>>>> > > > Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>>>>> > > >
>>>>> > > > You can find the sample code at: https://github.com/j1cs/app-beam
>>>>> > > >
>>>>> > > > Thank you for your help and support.
>>>>> > > >
>>>>> > > > Best regards,
>>>>> > > >
>>>>> > > > Juan Cuzmar.

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Reuven Lax via user <us...@beam.apache.org>.
Oh - in that case it's possible that the problem may be the direct runner's
implementation of the pubsub source - especially the watermark. For a
direct-runner test, I recommend using TestStream (which allows you to
advance the watermark manually, so you can test windowing).

On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jc...@protonmail.com> wrote:

> I'm developing with direct runner. but should go to dataflow when
> deployed.
>
>
> -------- Original Message --------
> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>
>
> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jc...@protonmail.com>
> wrote:
>
>> Same result:
>> PCollection<String> result = p
>>                 .apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>>                 .apply("Transform", ParDo.of(new MyTransformer()))
>>                 .apply("Windowing",
>> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>>                         .triggering(AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>>                         .withAllowedLateness(Duration.standardMinutes(1))
>>                         .discardingFiredPanes());
>>
>>         PCollection<Void> insert = result.apply("Inserting",
>>                 JdbcIO.<String>write()
>>
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>                         .withStatement("INSERT INTO person (first_name,
>> last_name) VALUES (?, 'doe')")
>>                         .withPreparedStatementSetter((element,
>> preparedStatement) -> {
>>                             log.info("Preparing statement to insert");
>>                             preparedStatement.setString(1, element);
>>                         })
>>                         .withResults()
>>         );
>>         result.apply(Wait.on(insert))
>>                 .apply("Selecting", new SomeTransform())
>>                 .apply("PubsubMessaging", ParDo.of(new
>> NextTransformer()));
>>         p.run();
>>
>> updated the github repo as wqell.
>>
>> ------- Original Message -------
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>
>> > The other problem you have here is that you have not set a window.
>> Wait.on waits for the end of the current window before triggering. The
>> default Window is the GlobalWindow, so as written Wait.on will wait for the
>> end of time (or until you drain the pipeline, which will also trigger the
>> GlobalWindow).
>> > Try adding a 1-minute fixed window to the results you read from PubSub.
>> >
>> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com>
>> wrote:
>> >
>> > > writeVoid() and write() plus withResults() return the same
>> PCollection<Void> AFAIK. In any case i updated the code and same thing
>> happens
>> > >
>> > > PCollection<String> result = p.
>> > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > >
>> > > PCollection<Void> insert = result.apply("Inserting",
>> > > JdbcIO.<String>write()
>> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
>> 'doe')")
>> > > .withPreparedStatementSetter((element, preparedStatement) -> {
>> > > log.info("Preparing statement to insert");
>> > > preparedStatement.setString(1, element);
>> > > })
>> > > .withResults()
>> > > );
>> > > result.apply(Wait.on(insert))
>> > > .apply("Selecting", new SomeTransform())
>> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > >
>> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>> > >
>> > > ------- Original Message -------
>> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>> > >
>> > >
>> > > > I believe you have to call withResults() on the JdbcIO transform in
>> order for this to work.
>> > > >
>> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <
>> jcuzmar@protonmail.com> wrote:
>> > > >
>> > > > > I hope you all are doing well. I am facing an issue with an
>> Apache Beam pipeline that gets stuck indefinitely when using the Wait.on
>> transform alongside JdbcIO. Here's a simplified version of my code,
>> focusing on the relevant parts:
>> > > > >
>> > > > > PCollection<String> result = p.
>> > > > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > > > >
>> > > > > PCollection<Void> insert = result.apply("Inserting",
>> > > > > JdbcIO.<String>writeVoid()
>> > > > > .withDataSourceProviderFn(/*...*/)
>> > > > > .withStatement(/*...*/)
>> > > > > .withPreparedStatementSetter(/*...*/)
>> > > > > );
>> > > > >
>> > > > > result.apply(Wait.on(insert))
>> > > > > .apply("Selecting", new SomeTransform())
>> > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > > > > p.run();
>> > > > >
>> > > > > In the code, I'm using the Wait.on transform to make the pipeline
>> wait until the insert transform (which uses JdbcIO to write data) is
>> completed before executing the next steps. However, the pipeline gets stuck
>> and doesn't progress further.
>> > > > >
>> > > > > I've tried adding logging messages in my transforms to track the
>> progress and identify where it's getting stuck, but I haven't been able to
>> pinpoint the issue. I've searched for solutions online, but none of them
>> provided a successful resolution for my problem.
>> > > > >
>> > > > > Can anyone provide any insights or suggestions on how to debug
>> and resolve this issue involving Wait.on and JdbcIO in my Apache Beam
>> pipeline?
>> > > > >
>> > > > > You can find the sample code at: https://github.com/j1cs/app-beam
>> > > > >
>> > > > > Thank you for your help and support.
>> > > > >
>> > > > > Best regards,
>> > > > >
>> > > > > Juan Cuzmar.
>>
>

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Juan Cuzmar <jc...@protonmail.com>.
I'm developing with direct runner. but should go to dataflow when deployed.

-------- Original Message --------
On Apr 22, 2023, 13:13, Reuven Lax via user wrote:

> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jc...@protonmail.com> wrote:
>
>> Same result:
>> PCollection<String> result = p
>> .apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
>> .apply("Transform", ParDo.of(new MyTransformer()))
>> .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>> .triggering(AfterWatermark.pastEndOfWindow()
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>> .withAllowedLateness(Duration.standardMinutes(1))
>> .discardingFiredPanes());
>>
>> PCollection<Void> insert = result.apply("Inserting",
>> JdbcIO.<String>write()
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
>> .withPreparedStatementSetter((element, preparedStatement) -> {
>> log.info("Preparing statement to insert");
>> preparedStatement.setString(1, element);
>> })
>> .withResults()
>> );
>> result.apply(Wait.on(insert))
>> .apply("Selecting", new SomeTransform())
>> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> p.run();
>>
>> updated the github repo as wqell.
>>
>> ------- Original Message -------
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
>>
>>> The other problem you have here is that you have not set a window. Wait.on waits for the end of the current window before triggering. The default Window is the GlobalWindow, so as written Wait.on will wait for the end of time (or until you drain the pipeline, which will also trigger the GlobalWindow).
>>> Try adding a 1-minute fixed window to the results you read from PubSub.
>>>
>>> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com> wrote:
>>>
>>> > writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens
>>> >
>>> > PCollection<String> result = p.
>>> > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
>>> > .apply("Transform", ParDo.of(new MyTransformer()));
>>> >
>>> > PCollection<Void> insert = result.apply("Inserting",
>>> > JdbcIO.<String>write()
>>> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
>>> > .withPreparedStatementSetter((element, preparedStatement) -> {
>>> > log.info("Preparing statement to insert");
>>> > preparedStatement.setString(1, element);
>>> > })
>>> > .withResults()
>>> > );
>>> > result.apply(Wait.on(insert))
>>> > .apply("Selecting", new SomeTransform())
>>> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>>> >
>>> > ------- Original Message -------
>>> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
>>> >
>>> >
>>> > > I believe you have to call withResults() on the JdbcIO transform in order for this to work.
>>> > >
>>> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:
>>> > >
>>> > > > I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:
>>> > > >
>>> > > > PCollection<String> result = p.
>>> > > > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>>> > > > .apply("Transform", ParDo.of(new MyTransformer()));
>>> > > >
>>> > > > PCollection<Void> insert = result.apply("Inserting",
>>> > > > JdbcIO.<String>writeVoid()
>>> > > > .withDataSourceProviderFn(/*...*/)
>>> > > > .withStatement(/*...*/)
>>> > > > .withPreparedStatementSetter(/*...*/)
>>> > > > );
>>> > > >
>>> > > > result.apply(Wait.on(insert))
>>> > > > .apply("Selecting", new SomeTransform())
>>> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>>> > > > p.run();
>>> > > >
>>> > > > In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.
>>> > > >
>>> > > > I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.
>>> > > >
>>> > > > Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>>> > > >
>>> > > > You can find the sample code at: https://github.com/j1cs/app-beam
>>> > > >
>>> > > > Thank you for your help and support.
>>> > > >
>>> > > > Best regards,
>>> > > >
>>> > > > Juan Cuzmar.

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Reuven Lax via user <us...@beam.apache.org>.
What runner are you using to run this pipeline?

On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jc...@protonmail.com> wrote:

> Same result:
> PCollection<String> result = p
>                 .apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
>                 .apply("Transform", ParDo.of(new MyTransformer()))
>                 .apply("Windowing",
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>                         .triggering(AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>                         .withAllowedLateness(Duration.standardMinutes(1))
>                         .discardingFiredPanes());
>
>         PCollection<Void> insert = result.apply("Inserting",
>                 JdbcIO.<String>write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>                         .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
>                         .withPreparedStatementSetter((element,
> preparedStatement) -> {
>                             log.info("Preparing statement to insert");
>                             preparedStatement.setString(1, element);
>                         })
>                         .withResults()
>         );
>         result.apply(Wait.on(insert))
>                 .apply("Selecting", new SomeTransform())
>                 .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>         p.run();
>
> updated the github repo as wqell.
>
> ------- Original Message -------
> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > The other problem you have here is that you have not set a window.
> Wait.on waits for the end of the current window before triggering. The
> default Window is the GlobalWindow, so as written Wait.on will wait for the
> end of time (or until you drain the pipeline, which will also trigger the
> GlobalWindow).
> > Try adding a 1-minute fixed window to the results you read from PubSub.
> >
> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com>
> wrote:
> >
> > > writeVoid() and write() plus withResults() return the same
> PCollection<Void> AFAIK. In any case i updated the code and same thing
> happens
> > >
> > > PCollection<String> result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection<Void> insert = result.apply("Inserting",
> > > JdbcIO.<String>write()
> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
> 'doe')")
> > > .withPreparedStatementSetter((element, preparedStatement) -> {
> > > log.info("Preparing statement to insert");
> > > preparedStatement.setString(1, element);
> > > })
> > > .withResults()
> > > );
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > >
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > >
> > > ------- Original Message -------
> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
> > >
> > >
> > > > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> > > >
> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com>
> wrote:
> > > >
> > > > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > > > >
> > > > > PCollection<String> result = p.
> > > > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > > >
> > > > > PCollection<Void> insert = result.apply("Inserting",
> > > > > JdbcIO.<String>writeVoid()
> > > > > .withDataSourceProviderFn(/*...*/)
> > > > > .withStatement(/*...*/)
> > > > > .withPreparedStatementSetter(/*...*/)
> > > > > );
> > > > >
> > > > > result.apply(Wait.on(insert))
> > > > > .apply("Selecting", new SomeTransform())
> > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > > p.run();
> > > > >
> > > > > In the code, I'm using the Wait.on transform to make the pipeline
> wait until the insert transform (which uses JdbcIO to write data) is
> completed before executing the next steps. However, the pipeline gets stuck
> and doesn't progress further.
> > > > >
> > > > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > > > >
> > > > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > > > >
> > > > > You can find the sample code at: https://github.com/j1cs/app-beam
> > > > >
> > > > > Thank you for your help and support.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Juan Cuzmar.
>

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Juan Cuzmar <jc...@protonmail.com>.
Same result:
PCollection<String> result = p
                .apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
                .apply("Transform", ParDo.of(new MyTransformer()))
                .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                        .withAllowedLateness(Duration.standardMinutes(1))
                        .discardingFiredPanes());

        PCollection<Void> insert = result.apply("Inserting",
                JdbcIO.<String>write()
                        .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
                        .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
                        .withPreparedStatementSetter((element, preparedStatement) -> {
                            log.info("Preparing statement to insert");
                            preparedStatement.setString(1, element);
                        })
                        .withResults()
        );
        result.apply(Wait.on(insert))
                .apply("Selecting", new SomeTransform())
                .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
        p.run();

updated the github repo as wqell.

------- Original Message -------
On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <us...@beam.apache.org> wrote:


> The other problem you have here is that you have not set a window. Wait.on waits for the end of the current window before triggering. The default Window is the GlobalWindow, so as written Wait.on will wait for the end of time (or until you drain the pipeline, which will also trigger the GlobalWindow).
> Try adding a 1-minute fixed window to the results you read from PubSub.
> 
> On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com> wrote:
> 
> > writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens
> > 
> > PCollection<String> result = p.
> > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
> > .apply("Transform", ParDo.of(new MyTransformer()));
> > 
> > PCollection<Void> insert = result.apply("Inserting",
> > JdbcIO.<String>write()
> > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
> > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
> > .withPreparedStatementSetter((element, preparedStatement) -> {
> > log.info("Preparing statement to insert");
> > preparedStatement.setString(1, element);
> > })
> > .withResults()
> > );
> > result.apply(Wait.on(insert))
> > .apply("Selecting", new SomeTransform())
> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
> > 
> > ------- Original Message -------
> > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <us...@beam.apache.org> wrote:
> > 
> > 
> > > I believe you have to call withResults() on the JdbcIO transform in order for this to work.
> > >
> > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:
> > >
> > > > I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:
> > > >
> > > > PCollection<String> result = p.
> > > > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > > .apply("Transform", ParDo.of(new MyTransformer()));
> > > >
> > > > PCollection<Void> insert = result.apply("Inserting",
> > > > JdbcIO.<String>writeVoid()
> > > > .withDataSourceProviderFn(/*...*/)
> > > > .withStatement(/*...*/)
> > > > .withPreparedStatementSetter(/*...*/)
> > > > );
> > > >
> > > > result.apply(Wait.on(insert))
> > > > .apply("Selecting", new SomeTransform())
> > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > > p.run();
> > > >
> > > > In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.
> > > >
> > > > I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.
> > > >
> > > > Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > > >
> > > > You can find the sample code at: https://github.com/j1cs/app-beam
> > > >
> > > > Thank you for your help and support.
> > > >
> > > > Best regards,
> > > >
> > > > Juan Cuzmar.

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Reuven Lax via user <us...@beam.apache.org>.
The other problem you have here is that you have not set a window. Wait.on
waits for the end of the current window before triggering. The default
Window is the GlobalWindow, so as written Wait.on will wait for the end of
time (or until you drain the pipeline, which will also trigger the
GlobalWindow).

Try adding a 1-minute fixed window to the results you read from PubSub.

On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jc...@protonmail.com> wrote:

> writeVoid() and write() plus withResults() return the same
> PCollection<Void> AFAIK. In any case i updated the code and same thing
> happens
>
>  PCollection<String> result = p.
>                 apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
>                 .apply("Transform", ParDo.of(new MyTransformer()));
>
>         PCollection<Void> insert = result.apply("Inserting",
>                 JdbcIO.<String>write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>                         .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
>                         .withPreparedStatementSetter((element,
> preparedStatement) -> {
>                             log.info("Preparing statement to insert");
>                             preparedStatement.setString(1, element);
>                         })
>                         .withResults()
>         );
>         result.apply(Wait.on(insert))
>                 .apply("Selecting", new SomeTransform())
>                 .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>
> ------- Original Message -------
> On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> user@beam.apache.org> wrote:
>
>
> > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> >
> > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com>
> wrote:
> >
> > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > >
> > > PCollection<String> result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection<Void> insert = result.apply("Inserting",
> > > JdbcIO.<String>writeVoid()
> > > .withDataSourceProviderFn(/*...*/)
> > > .withStatement(/*...*/)
> > > .withPreparedStatementSetter(/*...*/)
> > > );
> > >
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > p.run();
> > >
> > > In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
> > >
> > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > >
> > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > >
> > > You can find the sample code at: https://github.com/j1cs/app-beam
> > >
> > > Thank you for your help and support.
> > >
> > > Best regards,
> > >
> > > Juan Cuzmar.
>

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Juan Cuzmar <jc...@protonmail.com>.
writeVoid() and write() plus withResults() return the same PCollection<Void> AFAIK. In any case i updated the code and same thing happens

 PCollection<String> result = p.
                apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", options.getProjectId(), subscription)))
                .apply("Transform", ParDo.of(new MyTransformer()));

        PCollection<Void> insert = result.apply("Inserting",
                JdbcIO.<String>write()
                        .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
                        .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, 'doe')")
                        .withPreparedStatementSetter((element, preparedStatement) -> {
                            log.info("Preparing statement to insert");
                            preparedStatement.setString(1, element);
                        })
                        .withResults()
        );
        result.apply(Wait.on(insert))
                .apply("Selecting", new SomeTransform())
                .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63

------- Original Message -------
On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <us...@beam.apache.org> wrote:


> I believe you have to call withResults() on the JdbcIO transform in order for this to work.
> 
> On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:
> 
> > I hope you all are doing well. I am facing an issue with an Apache Beam pipeline that gets stuck indefinitely when using the Wait.on transform alongside JdbcIO. Here's a simplified version of my code, focusing on the relevant parts:
> > 
> > PCollection<String> result = p.
> > apply("Pubsub", PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > .apply("Transform", ParDo.of(new MyTransformer()));
> > 
> > PCollection<Void> insert = result.apply("Inserting",
> > JdbcIO.<String>writeVoid()
> > .withDataSourceProviderFn(/*...*/)
> > .withStatement(/*...*/)
> > .withPreparedStatementSetter(/*...*/)
> > );
> > 
> > result.apply(Wait.on(insert))
> > .apply("Selecting", new SomeTransform())
> > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > p.run();
> > 
> > In the code, I'm using the Wait.on transform to make the pipeline wait until the insert transform (which uses JdbcIO to write data) is completed before executing the next steps. However, the pipeline gets stuck and doesn't progress further.
> > 
> > I've tried adding logging messages in my transforms to track the progress and identify where it's getting stuck, but I haven't been able to pinpoint the issue. I've searched for solutions online, but none of them provided a successful resolution for my problem.
> > 
> > Can anyone provide any insights or suggestions on how to debug and resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > 
> > You can find the sample code at: https://github.com/j1cs/app-beam
> > 
> > Thank you for your help and support.
> > 
> > Best regards,
> > 
> > Juan Cuzmar.

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

Posted by Reuven Lax via user <us...@beam.apache.org>.
I believe you have to call withResults() on the JdbcIO transform in order
for this to work.

On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <jc...@protonmail.com> wrote:

> I hope you all are doing well. I am facing an issue with an Apache Beam
> pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
>
> PCollection<String> result = p.
>     apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>     .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection<Void> insert = result.apply("Inserting",
>     JdbcIO.<String>writeVoid()
>         .withDataSourceProviderFn(/*...*/)
>         .withStatement(/*...*/)
>         .withPreparedStatementSetter(/*...*/)
> );
>
> result.apply(Wait.on(insert))
>     .apply("Selecting", new SomeTransform())
>     .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
>
> I've tried adding logging messages in my transforms to track the progress
> and identify where it's getting stuck, but I haven't been able to pinpoint
> the issue. I've searched for solutions online, but none of them provided a
> successful resolution for my problem.
>
> Can anyone provide any insights or suggestions on how to debug and resolve
> this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>
> You can find the sample code at: https://github.com/j1cs/app-beam
>
> Thank you for your help and support.
>
> Best regards,
>
> Juan Cuzmar.
>