You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexey Romanenko (Jira)" <ji...@apache.org> on 2021/12/30 16:34:00 UTC
[jira] [Updated] (BEAM-13584) Add "Write.withResults()" for all Java SDK IOs
[ https://issues.apache.org/jira/browse/BEAM-13584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Romanenko updated BEAM-13584:
------------------------------------
Description:
Currently, by default, a "Write" transform is considered as a final {{PTransform}} in a pipeline and it returns {{PDone}} that can't be used later for next pipeline stages.
Though, sometimes it's needed to use the results of write operation as input {{PCollection}} downstream of pipeline and some IOs already support this. For example, {{JdbcIO.Write}} has the methods {{withResults()}} and {{withWriteResults()}} that returns a {{PTransform}} with output {{PCollection<>}} of write results and it can be used to analyse the results or, along with {{Wait}} transform, to write into the second database only if writes into the first one were finished, like this:
{code}
PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
.withDataSourceConfiguration(CONF_DB_1).withResults());
data.apply(Wait.on(firstWriteResults))
.apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
{code}
> Add "Write.withResults()" for all Java SDK IOs
> ----------------------------------------------
>
> Key: BEAM-13584
> URL: https://issues.apache.org/jira/browse/BEAM-13584
> Project: Beam
> Issue Type: Improvement
> Components: io-ideas
> Reporter: Alexey Romanenko
> Priority: P2
>
> Currently, by default, a "Write" transform is considered as a final {{PTransform}} in a pipeline and it returns {{PDone}} that can't be used later for next pipeline stages.
> Though, sometimes it's needed to use the results of write operation as input {{PCollection}} downstream of pipeline and some IOs already support this. For example, {{JdbcIO.Write}} has the methods {{withResults()}} and {{withWriteResults()}} that returns a {{PTransform}} with output {{PCollection<>}} of write results and it can be used to analyse the results or, along with {{Wait}} transform, to write into the second database only if writes into the first one were finished, like this:
> {code}
> PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
> .withDataSourceConfiguration(CONF_DB_1).withResults());
> data.apply(Wait.on(firstWriteResults))
> .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)