You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Koffman, Noa (Nokia - IL/Kfar Sava)" <no...@nokia.com> on 2021/11/10 14:47:48 UTC

JDBC.IO multiple lines

Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a different pcollection, and we are not sure how we can combine the different results (they all have the same key)

So shortly the question is, how can we get a resultset with the entire query results (with all the lines)?

Thanks!

This is our current code:

input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, SomeResult>>readAll()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "org.postgresql.Driver", PipelineUtil.getDbInfo())
                .withUsername("user")
                .withPassword(PipelineUtil.readCredentials()))
        .withQuery("select *  from some_table where some_id = ? order by insert_timestamp limit 5")
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() {
            @Override
            public void setParameters(KV<String, Iterable<String>> element,
                                      PreparedStatement preparedStatement) throws Exception {

                String someId = element.getKey();

                preparedStatement.setString(1, someId);
                System.out.println("preparedStatement: " + preparedStatement);
            }
        })
        .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
            public KV<String, SomeResult> mapRow(ResultSet resultSet) throws Exception {

                System.out.println("resultSet: " + resultSet);
                String someId = resultSet.getString("some_id");

                OurObject ourObject = new OurObject (
                        resultSet.getString("col1"),
                        resultSet.getString("col2"),
                        resultSet.getFloat("col3"),
                        resultSet.getString("col4"),
                        resultSet.getLong("col5")
                );
                SomeResult someResults = new SomeResult(ourObject,
                        resultSet.getString("output_id"),
                        resultSet.getString("insert_timestamp")
                );
                System.out.println("someResults: " + someResults);

                return KV.of(nfcId, someResults);
            }


Re: JDBC.IO multiple lines

Posted by "Koffman, Noa (Nokia - IL/Kfar Sava)" <no...@nokia.com>.
Thanks for the quick response, I will try that

From: Luke Cwik <lc...@google.com>
Date: Wednesday, 10 November 2021 at 19:04
To: user@beam.apache.org <us...@beam.apache.org>
Subject: Re: JDBC.IO multiple lines
Getting one record at a time is as expected since the entire result set may not fit into memory and the purpose of the RowMapper is to map one row of the ResultSset at a time.

If you know that the result set is always going to be small then you can either:
1) Rewrite the query so that you get all the results as a single row (efficient but can be inconvenient depending on the query)
2) Apply a GroupByKey with a singleton key like:
p.apply(JdbcIO.readAll(...))  // PCollection<KV<String, SomeResult>>
 .apply(WithKeys.of(null))     // PCollection<KV<Void, KV<String, SomeResult>>>
 .apply(GroupByKey.createWithFewKeys())    // PCollection<KV<Void, Iterable<KV<String, SomeResult>>>>
 .apply(Values.create())  // PCollection<Iterable<KV<String, SomeResult>>>
 .apply(ParDo.of(new CombineResultsDoFn(...))

CombineResultsDoFn is a DoFn that you would author that would get a single iterable value containing all the mapped rows from the RowMapper.


On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) <no...@nokia.com>> wrote:
Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a different pcollection, and we are not sure how we can combine the different results (they all have the same key)

So shortly the question is, how can we get a resultset with the entire query results (with all the lines)?

Thanks!

This is our current code:

input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, SomeResult>>readAll()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "org.postgresql.Driver", PipelineUtil.getDbInfo())
                .withUsername("user")
                .withPassword(PipelineUtil.readCredentials()))
        .withQuery("select *  from some_table where some_id = ? order by insert_timestamp limit 5")
        .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() {
            @Override
            public void setParameters(KV<String, Iterable<String>> element,
                                      PreparedStatement preparedStatement) throws Exception {

                String someId = element.getKey();

                preparedStatement.setString(1, someId);
                System.out.println("preparedStatement: " + preparedStatement);
            }
        })
        .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
            public KV<String, SomeResult> mapRow(ResultSet resultSet) throws Exception {

                System.out.println("resultSet: " + resultSet);
                String someId = resultSet.getString("some_id");

                OurObject ourObject = new OurObject (
                        resultSet.getString("col1"),
                        resultSet.getString("col2"),
                        resultSet.getFloat("col3"),
                        resultSet.getString("col4"),
                        resultSet.getLong("col5")
                );
                SomeResult someResults = new SomeResult(ourObject,
                        resultSet.getString("output_id"),
                        resultSet.getString("insert_timestamp")
                );
                System.out.println("someResults: " + someResults);

                return KV.of(nfcId, someResults);
            }


Re: JDBC.IO multiple lines

Posted by Luke Cwik <lc...@google.com>.
Getting one record at a time is as expected since the entire result set may
not fit into memory and the purpose of the RowMapper is to map one row of
the ResultSset at a time.

If you know that the result set is always going to be small then you can
either:
1) Rewrite the query so that you get all the results as a single row
(efficient but can be inconvenient depending on the query)
2) Apply a GroupByKey with a singleton key like:
p.apply(JdbcIO.readAll(...))  // PCollection<KV<String, SomeResult>>
 .apply(WithKeys.of(null))     // PCollection<KV<Void, KV<String,
SomeResult>>>
 .apply(GroupByKey.createWithFewKeys())    // PCollection<KV<Void,
Iterable<KV<String, SomeResult>>>>
 .apply(Values.create())  // PCollection<Iterable<KV<String, SomeResult>>>
 .apply(ParDo.of(new CombineResultsDoFn(...))

CombineResultsDoFn is a DoFn that you would author that would get a single
iterable value containing all the mapped rows from the RowMapper.


On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) <
noa.koffman@nokia.com> wrote:

> Hi,
>
> We are trying to use JDBC.IO.readAll, to query a postgres table, we need
> to do some calculation on the resultset and create an object from that
> calculation.
>
> However, when the query run, we get each line in a different resultset in
> a different pcollection, and we are not sure how we can combine the
> different results (they all have the same key)
>
>
>
> So shortly the question is, how can we get a resultset with the entire
> query results (with all the lines)?
>
>
>
> Thanks!
>
>
>
> This is our current code:
>
>
>
> input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<
> String, SomeResult>>*readAll*()
>         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.
> *create*(
>                         "org.postgresql.Driver", PipelineUtil.*getDbInfo*
> ())
>                 .withUsername("user")
>                 .withPassword(PipelineUtil.*readCredentials*()))
>         .withQuery("select *  from some_table where some_id = ? order by
> insert_timestamp limit 5")
>         .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String,
> Iterable<String>>>() {
>             @Override
>             public void setParameters(KV<String, Iterable<String>>
> element,
>                                       PreparedStatement preparedStatement)
> throws Exception {
>
>                 String someId = element.getKey();
>
>                 preparedStatement.setString(1, someId);
>                 System.*out*.println("preparedStatement: " +
> preparedStatement);
>             }
>         })
>         .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
>             public KV<String, SomeResult> mapRow(ResultSet resultSet) throws
> Exception {
>
>                 System.*out*.println("resultSet: " + resultSet);
>                 String someId = resultSet.getString("some_id");
>
>                 OurObject ourObject = new OurObject (
>                         resultSet.getString("col1"),
>                         resultSet.getString("col2"),
>                         resultSet.getFloat("col3"),
>                         resultSet.getString("col4"),
>                         resultSet.getLong("col5")
>                 );
>                 SomeResult someResults = new SomeResult(ourObject,
>                         resultSet.getString("output_id"),
>                         resultSet.getString("insert_timestamp")
>                 );
>                 System.*out*.println("someResults: " + someResults);
>
>                 return KV.*of*(nfcId, someResults);
>             }
>
>
>