You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 09:01:39 UTC

[1/2] incubator-beam git commit: [BEAM-244] Add JDBC IO

Repository: incubator-beam
Updated Branches:
  refs/heads/master bc80ee342 -> c5c343659


[BEAM-244] Add JDBC IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19fad184
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19fad184
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19fad184

Branch: refs/heads/master
Commit: 19fad184ac0f8521770dff96bdad5bff2ef9aa03
Parents: bc80ee3
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Sep 5 12:57:14 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 10:35:03 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/jdbc/pom.xml                       | 138 ++++++
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 427 +++++++++++++++++++
 .../apache/beam/sdk/io/jdbc/package-info.java   |  22 +
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 236 ++++++++++
 sdks/java/io/pom.xml                            |   1 +
 5 files changed, 824 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
new file mode 100644
index 0000000..75eb5ed
--- /dev/null
+++ b/sdks/java/io/jdbc/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.3.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-jdbc</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: JDBC</name>
+  <description>IO to read and write on JDBC datasource.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-dbcp2</artifactId>
+      <version>2.1.1</version>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <version>10.12.1.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derbyclient</artifactId>
+      <version>10.12.1.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derbynet</artifactId>
+      <version>10.12.1.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
new file mode 100644
index 0000000..f4c3cab
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Random;
+
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.dbcp2.BasicDataSource;
+
+/**
+ * IO to read and write data on JDBC.
+ *
+ * <h3>Reading from JDBC datasource</h3>
+ *
+ * <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the
+ * type returned by the provided {@link RowMapper}.
+ *
+ * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using
+ * {@link DataSourceConfiguration#create} with either a {@link DataSource} (which must be
+ * {@link Serializable}) or the parameters needed to create it (driver class name, url, and
+ * optionally username and password). For example:
+ *
+ * <pre>{@code
+ * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
+ *   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ *       "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ *       "username", "password"))
+ *   .withQuery("select id,name from Person")
+ *   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
+ *     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
+ *       return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ *     }
+ *   })
+ * }</pre>
+ *
+ * <h3>Writing to JDBC datasource</h3>
+ *
+ * <p>JDBC sink supports writing records into a database. It writes a {@link PCollection} to the
+ * database by converting each T into a {@link PreparedStatement} via a user-provided {@link
+ * PreparedStatementSetter}.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DataSourceConfiguration}.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(JdbcIO.<KV<Integer, String>>write()
+ *      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ *         "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ *         "username", "password"))
+ *      .withStatement("insert into Person values(?, ?)")
+ *      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+ *        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
+ *          query.setInt(1, kv.getKey());
+ *          query.setString(2, kv.getValue());
+ *        }
+ *      })
+ * }</pre>
+ *
+ * <p>NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
+ * times for fault tolerance. Because of that, you should avoid using {@code INSERT} statements,
+ * since that risks duplicating records in the database, or failing due to primary key conflicts.
+ * Consider using <a href="https://en.wikipedia.org/wiki/Merge_(SQL)">MERGE ("upsert")
+ * statements</a> supported by your database instead.
+ */
+public class JdbcIO {
+  /**
+   * Read data from a JDBC datasource.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return new AutoValue_JdbcIO_Read.Builder<T>().build();
+  }
+
+  /**
+   * Write data to a JDBC datasource.
+   *
+   * @param <T> Type of the data to be written.
+   */
+  public static <T> Write<T> write() {
+    return new AutoValue_JdbcIO_Write.Builder<T>().build();
+  }
+
+  private JdbcIO() {}
+
+  /**
+   * An interface used by {@link JdbcIO.Read} for converting each row of the {@link ResultSet} into
+   * an element of the resulting {@link PCollection}.
+   */
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(ResultSet resultSet) throws Exception;
+  }
+
+  /**
+   * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all
+   * properties allowing to create a {@link DataSource}.
+   */
+  @AutoValue
+  abstract static class DataSourceConfiguration implements Serializable {
+    @Nullable abstract String getDriverClassName();
+    @Nullable abstract String getUrl();
+    @Nullable abstract String getUsername();
+    @Nullable abstract String getPassword();
+    @Nullable abstract DataSource getDataSource();
+
+    /** Configuration using a {@link Serializable} {@link DataSource}. */
+    public static DataSourceConfiguration create(DataSource dataSource) {
+      checkNotNull(dataSource, "dataSource");
+      checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
+      return new AutoValue_JdbcIO_DataSourceConfiguration(null, null, null, null, dataSource);
+    }
+
+    /** Configuration using the given driver, url, username and password. */
+    public static DataSourceConfiguration create(
+        String driverClassName, String url, String username, String password) {
+      checkNotNull(driverClassName, "driverClassName");
+      checkNotNull(url, "url");
+      checkNotNull(username, "username");
+      checkNotNull(password, "password");
+      return new AutoValue_JdbcIO_DataSourceConfiguration(
+          driverClassName, url, username, password, null);
+    }
+
+    /** Configuration using the given driver and url, without a username and password. */
+    public static DataSourceConfiguration create(String driverClassName, String url) {
+      checkNotNull(driverClassName, "driverClassName");
+      checkNotNull(url, "url");
+      return new AutoValue_JdbcIO_DataSourceConfiguration(driverClassName, url, null, null, null);
+    }
+
+    private void populateDisplayData(DisplayData.Builder builder) {
+      if (getDataSource() != null) {
+        builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));
+      } else {
+        builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName()));
+        builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl()));
+        builder.addIfNotNull(DisplayData.item("username", getUsername()));
+      }
+    }
+
+    Connection getConnection() throws Exception {
+      DataSource dataSource;
+      if (getDataSource() != null) {
+        dataSource = getDataSource();
+      } else {
+        BasicDataSource basicDataSource = new BasicDataSource();
+        basicDataSource.setDriverClassName(getDriverClassName());
+        basicDataSource.setUrl(getUrl());
+        basicDataSource.setUsername(getUsername());
+        basicDataSource.setPassword(getPassword());
+        dataSource = basicDataSource;
+      }
+      return (getUsername() == null)
+          ? dataSource.getConnection()
+          : dataSource.getConnection(getUsername(), getPassword());
+    }
+  }
+
+  /** A {@link PTransform} to read data from a JDBC datasource. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
+    @Nullable abstract String getQuery();
+    @Nullable abstract RowMapper<T> getRowMapper();
+    @Nullable abstract Coder<T> getCoder();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
+      abstract Builder<T> setQuery(String query);
+      abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(DataSourceConfiguration configuration) {
+      checkNotNull(configuration, "configuration");
+      return toBuilder().setDataSourceConfiguration(configuration).build();
+    }
+
+    public Read<T> withQuery(String query) {
+      checkNotNull(query, "query");
+      return toBuilder().setQuery(query).build();
+    }
+
+    public Read<T> withRowMapper(RowMapper<T> rowMapper) {
+      checkNotNull(rowMapper, "rowMapper");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      checkNotNull(coder, "coder");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> apply(PBegin input) {
+      return input
+          .apply(Create.of(getQuery()))
+          .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
+          // generate a random key followed by a GroupByKey and then ungroup
+          // to prevent fusion
+          // see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
+          // for details
+          .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
+            private Random random;
+            @Setup
+            public void setup() {
+              random = new Random();
+            }
+            @ProcessElement
+            public void processElement(ProcessContext context) {
+              context.output(KV.of(random.nextInt(), context.element()));
+            }
+          }))
+          .apply(GroupByKey.<Integer, T>create())
+          .apply(Values.<Iterable<T>>create())
+          .apply(Flatten.<T>iterables());
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(getQuery(), "query");
+      checkNotNull(getRowMapper(), "rowMapper");
+      checkNotNull(getCoder(), "coder");
+      checkNotNull(getDataSourceConfiguration());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("query", getQuery()));
+      builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+      builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+      getDataSourceConfiguration().populateDisplayData(builder);
+    }
+
+    /** A {@link DoFn} executing the SQL query to read from the database. */
+    static class ReadFn<T> extends DoFn<String, T> {
+      private JdbcIO.Read<T> spec;
+      private Connection connection;
+
+      private ReadFn(Read<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = spec.getDataSourceConfiguration().getConnection();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        String query = context.element();
+        try (PreparedStatement statement = connection.prepareStatement(query)) {
+          try (ResultSet resultSet = statement.executeQuery()) {
+            while (resultSet.next()) {
+              context.output(spec.getRowMapper().mapRow(resultSet));
+            }
+          }
+        }
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        if (connection != null) {
+          connection.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
+   * used to setParameters into the database.
+   */
+  public interface PreparedStatementSetter<T> extends Serializable {
+    void setParameters(T element, PreparedStatement preparedStatement) throws Exception;
+  }
+
+  /** A {@link PTransform} to write to a JDBC datasource. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
+    @Nullable abstract String getStatement();
+    @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
+      abstract Builder<T> setStatement(String statement);
+      abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);
+
+      abstract Write<T> build();
+    }
+
+    public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
+      return toBuilder().setDataSourceConfiguration(config).build();
+    }
+    public Write<T> withStatement(String statement) {
+      return toBuilder().setStatement(statement).build();
+    }
+    public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
+      return toBuilder().setPreparedStatementSetter(setter).build();
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      input.apply(ParDo.of(new WriteFn<T>(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<T> input) {
+      checkNotNull(getDataSourceConfiguration(), "dataSourceConfiguration");
+      checkNotNull(getStatement(), "statement");
+      checkNotNull(getPreparedStatementSetter(), "preparedStatementSetter");
+    }
+
+    private static class WriteFn<T> extends DoFn<T, Void> {
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final Write<T> spec;
+
+      private Connection connection;
+      private PreparedStatement preparedStatement;
+      private int batchCount;
+
+      public WriteFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = spec.getDataSourceConfiguration().getConnection();
+        connection.setAutoCommit(false);
+        preparedStatement = connection.prepareStatement(spec.getStatement());
+      }
+
+      @StartBundle
+      public void startBundle(Context context) {
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        T record = context.element();
+
+        preparedStatement.clearParameters();
+        spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
+        preparedStatement.addBatch();
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          finishBundle(context);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(Context context) throws Exception {
+        if (batchCount > 0) {
+          preparedStatement.executeBatch();
+          connection.commit();
+          batchCount = 0;
+        }
+      }
+
+      @Teardown
+      public void teardown() throws Exception {
+        try {
+          if (preparedStatement != null) {
+            preparedStatement.close();
+          }
+        } finally {
+          if (connection != null) {
+            connection.close();
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
new file mode 100644
index 0000000..b568800
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from JDBC.
+ */
+package org.apache.beam.sdk.io.jdbc;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
new file mode 100644
index 0000000..b3073a2
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.derby.jdbc.ClientDataSource;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test on the JdbcIO.
+ */
+public class JdbcIOTest implements Serializable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(JdbcIOTest.class);
+
+  private static NetworkServerControl derbyServer;
+  private static ClientDataSource dataSource;
+
+  @BeforeClass
+  public static void startDatabase() throws Exception {
+    System.setProperty("derby.stream.error.file", "target/derby.log");
+
+    derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), 1527);
+    derbyServer.start(null);
+
+    dataSource = new ClientDataSource();
+    dataSource.setCreateDatabase("create");
+    dataSource.setDatabaseName("target/beam");
+    dataSource.setServerName("localhost");
+    dataSource.setPortNumber(1527);
+
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate("create table BEAM(id INT, name VARCHAR(500))");
+      }
+    }
+  }
+
+  @AfterClass
+  public static void shutDownDatabase() throws Exception {
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate("drop table BEAM");
+      }
+    } finally {
+      if (derbyServer != null) {
+        derbyServer.shutdown();
+      }
+    }
+  }
+
+  @Before
+  public void initTable() throws Exception {
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate("delete from BEAM");
+      }
+
+      String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+          "Newton", "Bohr", "Galilei", "Maxwell"};
+      connection.setAutoCommit(false);
+      try (PreparedStatement preparedStatement =
+               connection.prepareStatement("insert into BEAM " + "values (?,?)")) {
+        for (int i = 0; i < 1000; i++) {
+          int index = i % scientists.length;
+          preparedStatement.clearParameters();
+          preparedStatement.setInt(1, i);
+          preparedStatement.setString(2, scientists[index]);
+          preparedStatement.executeUpdate();
+        }
+      }
+
+      connection.commit();
+    }
+  }
+
+  @Test
+  public void testDataSourceConfigurationDataSource() throws Exception {
+    JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
+        dataSource);
+    try (Connection conn = config.getConnection()) {
+      assertTrue(conn.isValid(0));
+    }
+  }
+
+  @Test
+  public void testDataSourceConfigurationDriverAndUrl() throws Exception {
+    JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
+        "org.apache.derby.jdbc.ClientDriver",
+        "jdbc:derby://localhost:1527/target/beam");
+    try (Connection conn = config.getConnection()) {
+      assertTrue(conn.isValid(0));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testRead() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> output = pipeline.apply(
+        JdbcIO.<KV<String, Integer>>read()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+            .withQuery("select name,id from BEAM")
+            .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
+              @Override
+              public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
+                  KV<String, Integer> kv =
+                      KV.of(resultSet.getString("name"), resultSet.getInt("id"));
+                  return kv;
+              }
+            })
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+
+    PAssert.thatSingleton(
+        output.apply("Count All", Count.<KV<String, Integer>>globally()))
+        .isEqualTo(1000L);
+
+    PAssert.that(output
+        .apply("Count Scientist", Count.<String, Integer>perKey())
+    ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+      @Override
+      public Void apply(Iterable<KV<String, Long>> input) {
+        for (KV<String, Long> element : input) {
+          assertEquals(element.getKey(), 100L, element.getValue().longValue());
+        }
+        return null;
+      }
+    });
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWrite() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    ArrayList<KV<Integer, String>> data = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      KV<Integer, String> kv = KV.of(i, "Test");
+      data.add(kv);
+    }
+    pipeline.apply(Create.of(data))
+        .apply(JdbcIO.<KV<Integer, String>>write()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+                "org.apache.derby.jdbc.ClientDriver",
+                "jdbc:derby://localhost:1527/target/beam"))
+            .withStatement("insert into BEAM values(?, ?)")
+            .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+              public void setParameters(KV<Integer, String> element, PreparedStatement statement)
+                  throws Exception {
+                statement.setInt(1, element.getKey());
+                statement.setString(2, element.getValue());
+              }
+            }));
+
+    pipeline.run();
+
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        try (ResultSet resultSet = statement.executeQuery("select count(*) from BEAM")) {
+          resultSet.next();
+          int count = resultSet.getInt(1);
+
+          Assert.assertEquals(2000, count);
+        }
+      }
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithEmptyPCollection() throws Exception {
+    TestPipeline pipeline = TestPipeline.create();
+
+    pipeline.apply(Create.of(new ArrayList<KV<Integer, String>>()))
+        .apply(JdbcIO.<KV<Integer, String>>write()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+                "org.apache.derby.jdbc.ClientDriver",
+                "jdbc:derby://localhost:1527/target/beam"))
+            .withStatement("insert into BEAM values(?, ?)")
+            .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+              public void setParameters(KV<Integer, String> element, PreparedStatement statement)
+                  throws Exception {
+                statement.setInt(1, element.getKey());
+                statement.setString(2, element.getValue());
+              }
+            }));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index c4c32ed..82cf8d0 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -39,6 +39,7 @@
     <module>kafka</module>
     <module>kinesis</module>
     <module>mongodb</module>
+    <module>jdbc</module>
   </modules>
 
 </project>


[2/2] incubator-beam git commit: [BEAM-244] This closes #942

Posted by jb...@apache.org.
[BEAM-244] This closes #942


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5c34365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5c34365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5c34365

Branch: refs/heads/master
Commit: c5c343659ea7a597b2b6a5fe7efcec001f17a8f9
Parents: bc80ee3 19fad18
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 10:37:27 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 10:37:27 2016 +0200

----------------------------------------------------------------------
 sdks/java/io/jdbc/pom.xml                       | 138 ++++++
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 427 +++++++++++++++++++
 .../apache/beam/sdk/io/jdbc/package-info.java   |  22 +
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 236 ++++++++++
 sdks/java/io/pom.xml                            |   1 +
 5 files changed, 824 insertions(+)
----------------------------------------------------------------------