You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/31 15:28:52 UTC

[1/2] incubator-beam git commit: [BEAM-871] Add StatementPreparator on JdbcIO

Repository: incubator-beam
Updated Branches:
  refs/heads/master 594892d11 -> 54a737402


[BEAM-871] Add StatementPreparator on JdbcIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/365b627e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/365b627e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/365b627e

Branch: refs/heads/master
Commit: 365b627ea0dfdc3e5d4cd3f0fe98c0ffb502e3be
Parents: 594892d
Author: Gareth Western <ga...@garethwestern.com>
Authored: Mon Oct 3 00:07:15 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Oct 31 16:01:01 2016 +0100

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 39 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 33 +++++++++++++++++
 2 files changed, 72 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/365b627e/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 505cdee..0e0703f 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -77,6 +77,27 @@ import org.apache.commons.dbcp2.BasicDataSource;
  *   })
  * }</pre>
  *
+ * <p>Query parameters can be configured using a user-provided {@link StatementPreparator}.
+ * For example:</p>
+ *
+ * <pre>{@code
+ * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
+ *   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ *       "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ *       "username", "password"))
+ *   .withQuery("select id,name from Person where name = ?")
+ *   .withStatementPreparator(new JdbcIO.StatementPreparator() {
+ *     public void setParameters(PreparedStatement preparedStatement) throws Exception {
+ *       preparedStatement.setString(1, "Darwin");
+ *     }
+ *   })
+ *   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
+ *     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
+ *       return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ *     }
+ *   })
+ * }</pre>
+ *
  * <h3>Writing to JDBC datasource</h3>
  *
  * <p>JDBC sink supports writing records into a database. It writes a {@link PCollection} to the
@@ -212,11 +233,20 @@ public class JdbcIO {
     }
   }
 
+  /**
+   * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
+   * used to setParameters into the database.
+   */
+  public interface StatementPreparator extends Serializable {
+    void setParameters(PreparedStatement preparedStatement) throws Exception;
+  }
+
   /** A {@link PTransform} to read data from a JDBC datasource. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
     @Nullable abstract String getQuery();
+    @Nullable abstract StatementPreparator getStatementPreparator();
     @Nullable abstract RowMapper<T> getRowMapper();
     @Nullable abstract Coder<T> getCoder();
 
@@ -226,6 +256,7 @@ public class JdbcIO {
     abstract static class Builder<T> {
       abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
       abstract Builder<T> setQuery(String query);
+      abstract Builder<T> setStatementPreparator(StatementPreparator statementPreparator);
       abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Read<T> build();
@@ -241,6 +272,11 @@ public class JdbcIO {
       return toBuilder().setQuery(query).build();
     }
 
+    public Read<T> withStatementPrepator(StatementPreparator statementPreparator) {
+      checkNotNull(statementPreparator, "statementPreparator");
+      return toBuilder().setStatementPreparator(statementPreparator).build();
+    }
+
     public Read<T> withRowMapper(RowMapper<T> rowMapper) {
       checkNotNull(rowMapper, "rowMapper");
       return toBuilder().setRowMapper(rowMapper).build();
@@ -311,6 +347,9 @@ public class JdbcIO {
       public void processElement(ProcessContext context) throws Exception {
         String query = context.element();
         try (PreparedStatement statement = connection.prepareStatement(query)) {
+          if (this.spec.getStatementPreparator() != null) {
+            this.spec.getStatementPreparator().setParameters(statement);
+          }
           try (ResultSet resultSet = statement.executeQuery()) {
             while (resultSet.next()) {
               context.output(spec.getRowMapper().mapRow(resultSet));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/365b627e/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 860ca0f..c7b2de8 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -209,6 +209,39 @@ public class JdbcIOTest implements Serializable {
     pipeline.run();
   }
 
+   @Test
+   @Category(NeedsRunner.class)
+   public void testReadWithSingleStringParameter() throws Exception {
+     TestPipeline pipeline = TestPipeline.create();
+
+     PCollection<KV<String, Integer>> output = pipeline.apply(
+             JdbcIO.<KV<String, Integer>>read()
+                     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+                     .withQuery("select name,id from BEAM where name = ?")
+                     .withStatementPrepator(new JdbcIO.StatementPreparator() {
+                       @Override
+                       public void setParameters(PreparedStatement preparedStatement)
+                               throws Exception {
+                         preparedStatement.setString(1, "Darwin");
+                       }
+                     })
+                     .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
+                       @Override
+                       public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
+                         KV<String, Integer> kv =
+                                 KV.of(resultSet.getString("name"), resultSet.getInt("id"));
+                         return kv;
+                       }
+                     })
+                     .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+
+     PAssert.thatSingleton(
+             output.apply("Count One Scientist", Count.<KV<String, Integer>>globally()))
+             .isEqualTo(100L);
+
+     pipeline.run();
+   }
+
   @Test
   @Category(NeedsRunner.class)
   public void testWrite() throws Exception {


[2/2] incubator-beam git commit: [BEAM-871] This closes #1037

Posted by jb...@apache.org.
[BEAM-871] This closes #1037


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54a73740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54a73740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54a73740

Branch: refs/heads/master
Commit: 54a737402b61c5a5a707fabde55affaea716eca2
Parents: 594892d 365b627
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Oct 31 16:14:57 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Oct 31 16:14:57 2016 +0100

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 39 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 33 +++++++++++++++++
 2 files changed, 72 insertions(+)
----------------------------------------------------------------------