You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Koffman, Noa (Nokia - IL/Kfar Sava)" <no...@nokia.com> on 2021/11/10 14:47:48 UTC
JDBC.IO multiple lines
Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a different pcollection, and we are not sure how we can combine the different results (they all have the same key)
So shortly the question is, how can we get a resultset with the entire query results (with all the lines)?
Thanks!
This is our current code:
input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, SomeResult>>readAll()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", PipelineUtil.getDbInfo())
.withUsername("user")
.withPassword(PipelineUtil.readCredentials()))
.withQuery("select * from some_table where some_id = ? order by insert_timestamp limit 5")
.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() {
@Override
public void setParameters(KV<String, Iterable<String>> element,
PreparedStatement preparedStatement) throws Exception {
String someId = element.getKey();
preparedStatement.setString(1, someId);
System.out.println("preparedStatement: " + preparedStatement);
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
public KV<String, SomeResult> mapRow(ResultSet resultSet) throws Exception {
System.out.println("resultSet: " + resultSet);
String someId = resultSet.getString("some_id");
OurObject ourObject = new OurObject (
resultSet.getString("col1"),
resultSet.getString("col2"),
resultSet.getFloat("col3"),
resultSet.getString("col4"),
resultSet.getLong("col5")
);
SomeResult someResults = new SomeResult(ourObject,
resultSet.getString("output_id"),
resultSet.getString("insert_timestamp")
);
System.out.println("someResults: " + someResults);
return KV.of(nfcId, someResults);
}
Re: JDBC.IO multiple lines
Posted by "Koffman, Noa (Nokia - IL/Kfar Sava)" <no...@nokia.com>.
Thanks for the quick response, I will try that
From: Luke Cwik <lc...@google.com>
Date: Wednesday, 10 November 2021 at 19:04
To: user@beam.apache.org <us...@beam.apache.org>
Subject: Re: JDBC.IO multiple lines
Getting one record at a time is as expected since the entire result set may not fit into memory and the purpose of the RowMapper is to map one row of the ResultSset at a time.
If you know that the result set is always going to be small then you can either:
1) Rewrite the query so that you get all the results as a single row (efficient but can be inconvenient depending on the query)
2) Apply a GroupByKey with a singleton key like:
p.apply(JdbcIO.readAll(...)) // PCollection<KV<String, SomeResult>>
.apply(WithKeys.of(null)) // PCollection<KV<Void, KV<String, SomeResult>>>
.apply(GroupByKey.createWithFewKeys()) // PCollection<KV<Void, Iterable<KV<String, SomeResult>>>>
.apply(Values.create()) // PCollection<Iterable<KV<String, SomeResult>>>
.apply(ParDo.of(new CombineResultsDoFn(...))
CombineResultsDoFn is a DoFn that you would author that would get a single iterable value containing all the mapped rows from the RowMapper.
On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) <no...@nokia.com>> wrote:
Hi,
We are trying to use JDBC.IO.readAll, to query a postgres table, we need to do some calculation on the resultset and create an object from that calculation.
However, when the query run, we get each line in a different resultset in a different pcollection, and we are not sure how we can combine the different results (they all have the same key)
So shortly the question is, how can we get a resultset with the entire query results (with all the lines)?
Thanks!
This is our current code:
input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<String, SomeResult>>readAll()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", PipelineUtil.getDbInfo())
.withUsername("user")
.withPassword(PipelineUtil.readCredentials()))
.withQuery("select * from some_table where some_id = ? order by insert_timestamp limit 5")
.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<String>>>() {
@Override
public void setParameters(KV<String, Iterable<String>> element,
PreparedStatement preparedStatement) throws Exception {
String someId = element.getKey();
preparedStatement.setString(1, someId);
System.out.println("preparedStatement: " + preparedStatement);
}
})
.withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
public KV<String, SomeResult> mapRow(ResultSet resultSet) throws Exception {
System.out.println("resultSet: " + resultSet);
String someId = resultSet.getString("some_id");
OurObject ourObject = new OurObject (
resultSet.getString("col1"),
resultSet.getString("col2"),
resultSet.getFloat("col3"),
resultSet.getString("col4"),
resultSet.getLong("col5")
);
SomeResult someResults = new SomeResult(ourObject,
resultSet.getString("output_id"),
resultSet.getString("insert_timestamp")
);
System.out.println("someResults: " + someResults);
return KV.of(nfcId, someResults);
}
Re: JDBC.IO multiple lines
Posted by Luke Cwik <lc...@google.com>.
Getting one record at a time is as expected since the entire result set may
not fit into memory and the purpose of the RowMapper is to map one row of
the ResultSset at a time.
If you know that the result set is always going to be small then you can
either:
1) Rewrite the query so that you get all the results as a single row
(efficient but can be inconvenient depending on the query)
2) Apply a GroupByKey with a singleton key like:
p.apply(JdbcIO.readAll(...)) // PCollection<KV<String, SomeResult>>
.apply(WithKeys.of(null)) // PCollection<KV<Void, KV<String,
SomeResult>>>
.apply(GroupByKey.createWithFewKeys()) // PCollection<KV<Void,
Iterable<KV<String, SomeResult>>>>
.apply(Values.create()) // PCollection<Iterable<KV<String, SomeResult>>>
.apply(ParDo.of(new CombineResultsDoFn(...))
CombineResultsDoFn is a DoFn that you would author that would get a single
iterable value containing all the mapped rows from the RowMapper.
On Wed, Nov 10, 2021 at 6:48 AM Koffman, Noa (Nokia - IL/Kfar Sava) <
noa.koffman@nokia.com> wrote:
> Hi,
>
> We are trying to use JDBC.IO.readAll, to query a postgres table, we need
> to do some calculation on the resultset and create an object from that
> calculation.
>
> However, when the query run, we get each line in a different resultset in
> a different pcollection, and we are not sure how we can combine the
> different results (they all have the same key)
>
>
>
> So shortly the question is, how can we get a resultset with the entire
> query results (with all the lines)?
>
>
>
> Thanks!
>
>
>
> This is our current code:
>
>
>
> input.apply("read from db", JdbcIO.<KV<String, Iterable<String>>, KV<
> String, SomeResult>>*readAll*()
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.
> *create*(
> "org.postgresql.Driver", PipelineUtil.*getDbInfo*
> ())
> .withUsername("user")
> .withPassword(PipelineUtil.*readCredentials*()))
> .withQuery("select * from some_table where some_id = ? order by
> insert_timestamp limit 5")
> .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String,
> Iterable<String>>>() {
> @Override
> public void setParameters(KV<String, Iterable<String>>
> element,
> PreparedStatement preparedStatement)
> throws Exception {
>
> String someId = element.getKey();
>
> preparedStatement.setString(1, someId);
> System.*out*.println("preparedStatement: " +
> preparedStatement);
> }
> })
> .withRowMapper(new JdbcIO.RowMapper<KV<String, SomeResult>>() {
> public KV<String, SomeResult> mapRow(ResultSet resultSet) throws
> Exception {
>
> System.*out*.println("resultSet: " + resultSet);
> String someId = resultSet.getString("some_id");
>
> OurObject ourObject = new OurObject (
> resultSet.getString("col1"),
> resultSet.getString("col2"),
> resultSet.getFloat("col3"),
> resultSet.getString("col4"),
> resultSet.getLong("col5")
> );
> SomeResult someResults = new SomeResult(ourObject,
> resultSet.getString("output_id"),
> resultSet.getString("insert_timestamp")
> );
> System.*out*.println("someResults: " + someResults);
>
> return KV.*of*(nfcId, someResults);
> }
>
>
>