You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/31 15:28:52 UTC
[1/2] incubator-beam git commit: [BEAM-871] Add StatementPreparator
on JdbcIO
Repository: incubator-beam
Updated Branches:
refs/heads/master 594892d11 -> 54a737402
[BEAM-871] Add StatementPreparator on JdbcIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/365b627e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/365b627e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/365b627e
Branch: refs/heads/master
Commit: 365b627ea0dfdc3e5d4cd3f0fe98c0ffb502e3be
Parents: 594892d
Author: Gareth Western <ga...@garethwestern.com>
Authored: Mon Oct 3 00:07:15 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Oct 31 16:01:01 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 39 ++++++++++++++++++++
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 33 +++++++++++++++++
2 files changed, 72 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/365b627e/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 505cdee..0e0703f 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -77,6 +77,27 @@ import org.apache.commons.dbcp2.BasicDataSource;
* })
* }</pre>
*
+ * <p>Query parameters can be configured using a user-provided {@link StatementPreparator}.
+ * For example:</p>
+ *
+ * <pre>{@code
+ * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
+ * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ * "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ * "username", "password"))
+ * .withQuery("select id,name from Person where name = ?")
+ * .withStatementPreparator(new JdbcIO.StatementPreparator() {
+ * public void setParameters(PreparedStatement preparedStatement) throws Exception {
+ * preparedStatement.setString(1, "Darwin");
+ * }
+ * })
+ * .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
+ * public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
+ * return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ * }
+ * })
+ * }</pre>
+ *
* <h3>Writing to JDBC datasource</h3>
*
* <p>JDBC sink supports writing records into a database. It writes a {@link PCollection} to the
@@ -212,11 +233,20 @@ public class JdbcIO {
}
}
+ /**
+ * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
+ * used to setParameters into the database.
+ */
+ public interface StatementPreparator extends Serializable {
+ void setParameters(PreparedStatement preparedStatement) throws Exception;
+ }
+
/** A {@link PTransform} to read data from a JDBC datasource. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
@Nullable abstract String getQuery();
+ @Nullable abstract StatementPreparator getStatementPreparator();
@Nullable abstract RowMapper<T> getRowMapper();
@Nullable abstract Coder<T> getCoder();
@@ -226,6 +256,7 @@ public class JdbcIO {
abstract static class Builder<T> {
abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
abstract Builder<T> setQuery(String query);
+ abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Read<T> build();
@@ -241,6 +272,11 @@ public class JdbcIO {
return toBuilder().setQuery(query).build();
}
+ public Read<T> withStatementPrepator(StatementPreparator statementPreparator) {
+ checkNotNull(statementPreparator, "statementPreparator");
+ return toBuilder().setStatementPreparator(statementPreparator).build();
+ }
+
public Read<T> withRowMapper(RowMapper<T> rowMapper) {
checkNotNull(rowMapper, "rowMapper");
return toBuilder().setRowMapper(rowMapper).build();
@@ -311,6 +347,9 @@ public class JdbcIO {
public void processElement(ProcessContext context) throws Exception {
String query = context.element();
try (PreparedStatement statement = connection.prepareStatement(query)) {
+ if (this.spec.getStatementPreparator() != null) {
+ this.spec.getStatementPreparator().setParameters(statement);
+ }
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
context.output(spec.getRowMapper().mapRow(resultSet));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/365b627e/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 860ca0f..c7b2de8 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -209,6 +209,39 @@ public class JdbcIOTest implements Serializable {
pipeline.run();
}
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadWithSingleStringParameter() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<KV<String, Integer>> output = pipeline.apply(
+ JdbcIO.<KV<String, Integer>>read()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withQuery("select name,id from BEAM where name = ?")
+ .withStatementPrepator(new JdbcIO.StatementPreparator() {
+ @Override
+ public void setParameters(PreparedStatement preparedStatement)
+ throws Exception {
+ preparedStatement.setString(1, "Darwin");
+ }
+ })
+ .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
+ @Override
+ public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
+ KV<String, Integer> kv =
+ KV.of(resultSet.getString("name"), resultSet.getInt("id"));
+ return kv;
+ }
+ })
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+
+ PAssert.thatSingleton(
+ output.apply("Count One Scientist", Count.<KV<String, Integer>>globally()))
+ .isEqualTo(100L);
+
+ pipeline.run();
+ }
+
@Test
@Category(NeedsRunner.class)
public void testWrite() throws Exception {
[2/2] incubator-beam git commit: [BEAM-871] This closes #1037
Posted by jb...@apache.org.
[BEAM-871] This closes #1037
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54a73740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54a73740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54a73740
Branch: refs/heads/master
Commit: 54a737402b61c5a5a707fabde55affaea716eca2
Parents: 594892d 365b627
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Oct 31 16:14:57 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Oct 31 16:14:57 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 39 ++++++++++++++++++++
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 33 +++++++++++++++++
2 files changed, 72 insertions(+)
----------------------------------------------------------------------