You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Somnath Chouwdhury <so...@datametica.com> on 2023/02/21 19:31:12 UTC

BatchUpdateException while trying to use WriteToJdbc

Hii team,

We are facing an issue while trying to push data to RDBMS(oracle in our
case) while it runs for small amount of records but when is run through
bigger dataset it fails, throwing this error,

Error message from worker: org.apache.beam.sdk.util.UserCodeException:
> java.sql.BatchUpdateException: IO Error: Connection reset by peer
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
> java.sql.BatchUpdateException: IO Error: Connection reset by peer
> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [java.sql.SQLRecoverableException: Closed Connection]] at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [java.sql.SQLRecoverableException: Closed Connection] at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
> at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
> at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
> at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
> at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
> at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
> ... 28 more


Here is the code snippet we are using

coders.registry.register_coder(self.ExampleRow, coders.RowCoder)

data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
    lambda x: self.ExampleRow(**x)).with_output_types(
    self.ExampleRow) \
       | f"Write to RDBMS" >> WriteToJdbc(
    table_name=self.task["tablename"],
    driver_class_name=self.task['driver_class_name'],
    jdbc_url=self.task['jdbc_url'],
    username=self.task['username'],
    password=self.task['password'],
    classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])


How do we use dataflow to push bulk data in batch/streaming.

Thanks,
Somnath Chouwdhury.

Re: BatchUpdateException while trying to use WriteToJdbc

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
When you run larger workloads, Dataflow likely tries to split the work into
more splits and may be also autoscaling to add more workers. So the number
of parallel connections to the Database will go up if the workload is
higher. So probably try adjusting the Database settings to allow more
parallel connections.

If that is not possible you can try using the following pipeline options to
keep the number of workers low.

'max_num_workers': caps the number of workers of the pipeline [1]
'number_of_worker_harness_threads': caps the number of worker threads in a
single worker VM. (try setting this to 1). [2]

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/apache_beam/options/pipeline_options.py#L938
[2]
https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/apache_beam/options/pipeline_options.py#L1116


On Tue, Feb 21, 2023 at 7:01 PM Varun Rauthan <va...@datametica.com>
wrote:

> Hello Cham,
>
> The Runner in use is Dataflow Runner.
> The last 28 lines aren't available in Cloud logging as well.
>
> The Code shared above works just fine with 2-3 records but starts to fail
> when we try with a bigger source data payload.
> Does it look like multiple threads trying to acquire a write lock to the
> DB table(Oracle table)?
>
> *Thanks and Regards,*
>
> *Varun Rauthan*
>
>
>
>
> On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user <
> user@beam.apache.org> wrote:
>
>> Which runner are you using ?
>>
>> Also, do you have the bottom of the StackTrace here ? It's possibly due
>> to Docker containers running the Java SDK not having access to your
>> database, but I'm not sure based on the information provided.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
>> somnath.c@datametica.com> wrote:
>>
>>> Hii team,
>>>
>>> We are facing an issue while trying to push data to RDBMS(oracle in our
>>> case) while it runs for small amount of records but when is run through
>>> bigger dataset it fails, throwing this error,
>>>
>>> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>>>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [java.sql.SQLRecoverableException: Closed Connection]] at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>>> at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source) at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>>> at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source) at
>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>>> at
>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>>> at
>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>>> at
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>>> at
>>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>>> at
>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>>> at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>>> at
>>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>>> at
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>>> [java.sql.SQLRecoverableException: Closed Connection] at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>>> at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>>>> at
>>>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>>>> at
>>>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>>>> at
>>>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>>>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>>>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>>>> at
>>>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>>>> at
>>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>>> ... 28 more
>>>
>>>
>>> Here is the code snippet we are using
>>>
>>> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>>>
>>> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>>>     lambda x: self.ExampleRow(**x)).with_output_types(
>>>     self.ExampleRow) \
>>>        | f"Write to RDBMS" >> WriteToJdbc(
>>>     table_name=self.task["tablename"],
>>>     driver_class_name=self.task['driver_class_name'],
>>>     jdbc_url=self.task['jdbc_url'],
>>>     username=self.task['username'],
>>>     password=self.task['password'],
>>>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>>>
>>>
>>> How do we use dataflow to push bulk data in batch/streaming.
>>>
>>> Thanks,
>>> Somnath Chouwdhury.
>>>
>>

Re: BatchUpdateException while trying to use WriteToJdbc

Posted by Varun Rauthan <va...@datametica.com>.
Hello Cham,

The Runner in use is Dataflow Runner.
The last 28 lines aren't available in Cloud logging as well.

The Code shared above works just fine with 2-3 records but starts to fail
when we try with a bigger source data payload.
Does it look like multiple threads trying to acquire a write lock to the DB
table(Oracle table)?

*Thanks and Regards,*

*Varun Rauthan*




On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user <
user@beam.apache.org> wrote:

> Which runner are you using ?
>
> Also, do you have the bottom of the StackTrace here ? It's possibly due to
> Docker containers running the Java SDK not having access to your database,
> but I'm not sure based on the information provided.
>
> Thanks,
> Cham
>
> On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
> somnath.c@datametica.com> wrote:
>
>> Hii team,
>>
>> We are facing an issue while trying to push data to RDBMS(oracle in our
>> case) while it runs for small amount of records but when is run through
>> bigger dataset it fails, throwing this error,
>>
>> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection]] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> at
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>>> at
>>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>>> at
>>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 28 more
>>
>>
>> Here is the code snippet we are using
>>
>> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>>
>> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>>     lambda x: self.ExampleRow(**x)).with_output_types(
>>     self.ExampleRow) \
>>        | f"Write to RDBMS" >> WriteToJdbc(
>>     table_name=self.task["tablename"],
>>     driver_class_name=self.task['driver_class_name'],
>>     jdbc_url=self.task['jdbc_url'],
>>     username=self.task['username'],
>>     password=self.task['password'],
>>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>>
>>
>> How do we use dataflow to push bulk data in batch/streaming.
>>
>> Thanks,
>> Somnath Chouwdhury.
>>
>

Re: BatchUpdateException while trying to use WriteToJdbc

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
Which runner are you using ?

Also, do you have the bottom of the StackTrace here ? It's possibly due to
Docker containers running the Java SDK not having access to your database,
but I'm not sure based on the information provided.

Thanks,
Cham

On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
somnath.c@datametica.com> wrote:

> Hii team,
>
> We are facing an issue while trying to push data to RDBMS(oracle in our
> case) while it runs for small amount of records but when is run through
> bigger dataset it fails, throwing this error,
>
> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [java.sql.SQLRecoverableException: Closed Connection]] at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source) at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source) at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>> at
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>> at
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>> at
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>> [java.sql.SQLRecoverableException: Closed Connection] at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>> at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>> at
>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>> at
>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>> at
>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>> at
>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>> at
>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>> ... 28 more
>
>
> Here is the code snippet we are using
>
> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>
> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>     lambda x: self.ExampleRow(**x)).with_output_types(
>     self.ExampleRow) \
>        | f"Write to RDBMS" >> WriteToJdbc(
>     table_name=self.task["tablename"],
>     driver_class_name=self.task['driver_class_name'],
>     jdbc_url=self.task['jdbc_url'],
>     username=self.task['username'],
>     password=self.task['password'],
>     classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>
>
> How do we use dataflow to push bulk data in batch/streaming.
>
> Thanks,
> Somnath Chouwdhury.
>