You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/10/02 09:01:39 UTC
[1/2] incubator-beam git commit: [BEAM-244] Add JDBC IO
Repository: incubator-beam
Updated Branches:
refs/heads/master bc80ee342 -> c5c343659
[BEAM-244] Add JDBC IO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19fad184
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19fad184
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19fad184
Branch: refs/heads/master
Commit: 19fad184ac0f8521770dff96bdad5bff2ef9aa03
Parents: bc80ee3
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Sep 5 12:57:14 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 10:35:03 2016 +0200
----------------------------------------------------------------------
sdks/java/io/jdbc/pom.xml | 138 ++++++
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 427 +++++++++++++++++++
.../apache/beam/sdk/io/jdbc/package-info.java | 22 +
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 236 ++++++++++
sdks/java/io/pom.xml | 1 +
5 files changed, 824 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
new file mode 100644
index 0000000..75eb5ed
--- /dev/null
+++ b/sdks/java/io/jdbc/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-jdbc</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: JDBC</name>
+ <description>IO to read and write on JDBC datasource.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.12.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbyclient</artifactId>
+ <version>10.12.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbynet</artifactId>
+ <version>10.12.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
new file mode 100644
index 0000000..f4c3cab
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Random;
+
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.dbcp2.BasicDataSource;
+
+/**
+ * IO to read and write data on JDBC.
+ *
+ * <h3>Reading from JDBC datasource</h3>
+ *
+ * <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the
+ * type returned by the provided {@link RowMapper}.
+ *
+ * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using
+ * {@link DataSourceConfiguration#create} with either a {@link DataSource} (which must be
+ * {@link Serializable}) or the parameters needed to create it (driver class name, url, and
+ * optionally username and password). For example:
+ *
+ * <pre>{@code
+ * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
+ * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ * "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ * "username", "password"))
+ * .withQuery("select id,name from Person")
+ * .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
+ * public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
+ * return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ * }
+ * })
+ * }</pre>
+ *
+ * <h3>Writing to JDBC datasource</h3>
+ *
+ * <p>JDBC sink supports writing records into a database. It writes a {@link PCollection} to the
+ * database by converting each T into a {@link PreparedStatement} via a user-provided {@link
+ * PreparedStatementSetter}.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DataSourceConfiguration}.
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(...)
+ * .apply(JdbcIO.<KV<Integer, String>>write()
+ * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ * "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
+ * "username", "password"))
+ * .withStatement("insert into Person values(?, ?)")
+ * .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+ * public void setParameters(KV<Integer, String> element, PreparedStatement query) {
+ * query.setInt(1, kv.getKey());
+ * query.setString(2, kv.getValue());
+ * }
+ * })
+ * }</pre>
+ *
+ * <p>NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple
+ * times for fault tolerance. Because of that, you should avoid using {@code INSERT} statements,
+ * since that risks duplicating records in the database, or failing due to primary key conflicts.
+ * Consider using <a href="https://en.wikipedia.org/wiki/Merge_(SQL)">MERGE ("upsert")
+ * statements</a> supported by your database instead.
+ */
+public class JdbcIO {
+ /**
+ * Read data from a JDBC datasource.
+ *
+ * @param <T> Type of the data to be read.
+ */
+ public static <T> Read<T> read() {
+ return new AutoValue_JdbcIO_Read.Builder<T>().build();
+ }
+
+ /**
+ * Write data to a JDBC datasource.
+ *
+ * @param <T> Type of the data to be written.
+ */
+ public static <T> Write<T> write() {
+ return new AutoValue_JdbcIO_Write.Builder<T>().build();
+ }
+
+ private JdbcIO() {}
+
+ /**
+ * An interface used by {@link JdbcIO.Read} for converting each row of the {@link ResultSet} into
+ * an element of the resulting {@link PCollection}.
+ */
+ public interface RowMapper<T> extends Serializable {
+ T mapRow(ResultSet resultSet) throws Exception;
+ }
+
+ /**
+ * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all
+ * properties allowing to create a {@link DataSource}.
+ */
+ @AutoValue
+ abstract static class DataSourceConfiguration implements Serializable {
+ @Nullable abstract String getDriverClassName();
+ @Nullable abstract String getUrl();
+ @Nullable abstract String getUsername();
+ @Nullable abstract String getPassword();
+ @Nullable abstract DataSource getDataSource();
+
+ /** Configuration using a {@link Serializable} {@link DataSource}. */
+ public static DataSourceConfiguration create(DataSource dataSource) {
+ checkNotNull(dataSource, "dataSource");
+ checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
+ return new AutoValue_JdbcIO_DataSourceConfiguration(null, null, null, null, dataSource);
+ }
+
+ /** Configuration using the given driver, url, username and password. */
+ public static DataSourceConfiguration create(
+ String driverClassName, String url, String username, String password) {
+ checkNotNull(driverClassName, "driverClassName");
+ checkNotNull(url, "url");
+ checkNotNull(username, "username");
+ checkNotNull(password, "password");
+ return new AutoValue_JdbcIO_DataSourceConfiguration(
+ driverClassName, url, username, password, null);
+ }
+
+ /** Configuration using the given driver and url, without a username and password. */
+ public static DataSourceConfiguration create(String driverClassName, String url) {
+ checkNotNull(driverClassName, "driverClassName");
+ checkNotNull(url, "url");
+ return new AutoValue_JdbcIO_DataSourceConfiguration(driverClassName, url, null, null, null);
+ }
+
+ private void populateDisplayData(DisplayData.Builder builder) {
+ if (getDataSource() != null) {
+ builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));
+ } else {
+ builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName()));
+ builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl()));
+ builder.addIfNotNull(DisplayData.item("username", getUsername()));
+ }
+ }
+
+ Connection getConnection() throws Exception {
+ DataSource dataSource;
+ if (getDataSource() != null) {
+ dataSource = getDataSource();
+ } else {
+ BasicDataSource basicDataSource = new BasicDataSource();
+ basicDataSource.setDriverClassName(getDriverClassName());
+ basicDataSource.setUrl(getUrl());
+ basicDataSource.setUsername(getUsername());
+ basicDataSource.setPassword(getPassword());
+ dataSource = basicDataSource;
+ }
+ return (getUsername() == null)
+ ? dataSource.getConnection()
+ : dataSource.getConnection(getUsername(), getPassword());
+ }
+ }
+
+ /** A {@link PTransform} to read data from a JDBC datasource. */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
+ @Nullable abstract String getQuery();
+ @Nullable abstract RowMapper<T> getRowMapper();
+ @Nullable abstract Coder<T> getCoder();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
+ abstract Builder<T> setQuery(String query);
+ abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Read<T> build();
+ }
+
+ public Read<T> withDataSourceConfiguration(DataSourceConfiguration configuration) {
+ checkNotNull(configuration, "configuration");
+ return toBuilder().setDataSourceConfiguration(configuration).build();
+ }
+
+ public Read<T> withQuery(String query) {
+ checkNotNull(query, "query");
+ return toBuilder().setQuery(query).build();
+ }
+
+ public Read<T> withRowMapper(RowMapper<T> rowMapper) {
+ checkNotNull(rowMapper, "rowMapper");
+ return toBuilder().setRowMapper(rowMapper).build();
+ }
+
+ public Read<T> withCoder(Coder<T> coder) {
+ checkNotNull(coder, "coder");
+ return toBuilder().setCoder(coder).build();
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ return input
+ .apply(Create.of(getQuery()))
+ .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
+ // generate a random key followed by a GroupByKey and then ungroup
+ // to prevent fusion
+ // see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
+ // for details
+ .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
+ private Random random;
+ @Setup
+ public void setup() {
+ random = new Random();
+ }
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(KV.of(random.nextInt(), context.element()));
+ }
+ }))
+ .apply(GroupByKey.<Integer, T>create())
+ .apply(Values.<Iterable<T>>create())
+ .apply(Flatten.<T>iterables());
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkNotNull(getQuery(), "query");
+ checkNotNull(getRowMapper(), "rowMapper");
+ checkNotNull(getCoder(), "coder");
+ checkNotNull(getDataSourceConfiguration());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("query", getQuery()));
+ builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+ builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+ getDataSourceConfiguration().populateDisplayData(builder);
+ }
+
+ /** A {@link DoFn} executing the SQL query to read from the database. */
+ static class ReadFn<T> extends DoFn<String, T> {
+ private JdbcIO.Read<T> spec;
+ private Connection connection;
+
+ private ReadFn(Read<T> spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = spec.getDataSourceConfiguration().getConnection();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ String query = context.element();
+ try (PreparedStatement statement = connection.prepareStatement(query)) {
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ context.output(spec.getRowMapper().mapRow(resultSet));
+ }
+ }
+ }
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
+ * used to setParameters into the database.
+ */
+ public interface PreparedStatementSetter<T> extends Serializable {
+ void setParameters(T element, PreparedStatement preparedStatement) throws Exception;
+ }
+
+ /** A {@link PTransform} to write to a JDBC datasource. */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
+ @Nullable abstract String getStatement();
+ @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
+ abstract Builder<T> setStatement(String statement);
+ abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);
+
+ abstract Write<T> build();
+ }
+
+ public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
+ return toBuilder().setDataSourceConfiguration(config).build();
+ }
+ public Write<T> withStatement(String statement) {
+ return toBuilder().setStatement(statement).build();
+ }
+ public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
+ return toBuilder().setPreparedStatementSetter(setter).build();
+ }
+
+ @Override
+ public PDone apply(PCollection<T> input) {
+ input.apply(ParDo.of(new WriteFn<T>(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ checkNotNull(getDataSourceConfiguration(), "dataSourceConfiguration");
+ checkNotNull(getStatement(), "statement");
+ checkNotNull(getPreparedStatementSetter(), "preparedStatementSetter");
+ }
+
+ private static class WriteFn<T> extends DoFn<T, Void> {
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
+ private final Write<T> spec;
+
+ private Connection connection;
+ private PreparedStatement preparedStatement;
+ private int batchCount;
+
+ public WriteFn(Write<T> spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = spec.getDataSourceConfiguration().getConnection();
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(spec.getStatement());
+ }
+
+ @StartBundle
+ public void startBundle(Context context) {
+ batchCount = 0;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ T record = context.element();
+
+ preparedStatement.clearParameters();
+ spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
+ preparedStatement.addBatch();
+
+ batchCount++;
+
+ if (batchCount >= DEFAULT_BATCH_SIZE) {
+ finishBundle(context);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context context) throws Exception {
+ if (batchCount > 0) {
+ preparedStatement.executeBatch();
+ connection.commit();
+ batchCount = 0;
+ }
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ try {
+ if (preparedStatement != null) {
+ preparedStatement.close();
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
new file mode 100644
index 0000000..b568800
--- /dev/null
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from JDBC.
+ */
+package org.apache.beam.sdk.io.jdbc;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
new file mode 100644
index 0000000..b3073a2
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.derby.jdbc.ClientDataSource;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test on the JdbcIO.
+ */
+public class JdbcIOTest implements Serializable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcIOTest.class);
+
+ private static NetworkServerControl derbyServer;
+ private static ClientDataSource dataSource;
+
+ @BeforeClass
+ public static void startDatabase() throws Exception {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+
+ derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), 1527);
+ derbyServer.start(null);
+
+ dataSource = new ClientDataSource();
+ dataSource.setCreateDatabase("create");
+ dataSource.setDatabaseName("target/beam");
+ dataSource.setServerName("localhost");
+ dataSource.setPortNumber(1527);
+
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate("create table BEAM(id INT, name VARCHAR(500))");
+ }
+ }
+ }
+
+ @AfterClass
+ public static void shutDownDatabase() throws Exception {
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate("drop table BEAM");
+ }
+ } finally {
+ if (derbyServer != null) {
+ derbyServer.shutdown();
+ }
+ }
+ }
+
+ @Before
+ public void initTable() throws Exception {
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate("delete from BEAM");
+ }
+
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
+ "Newton", "Bohr", "Galilei", "Maxwell"};
+ connection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement("insert into BEAM " + "values (?,?)")) {
+ for (int i = 0; i < 1000; i++) {
+ int index = i % scientists.length;
+ preparedStatement.clearParameters();
+ preparedStatement.setInt(1, i);
+ preparedStatement.setString(2, scientists[index]);
+ preparedStatement.executeUpdate();
+ }
+ }
+
+ connection.commit();
+ }
+ }
+
+ @Test
+ public void testDataSourceConfigurationDataSource() throws Exception {
+ JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
+ dataSource);
+ try (Connection conn = config.getConnection()) {
+ assertTrue(conn.isValid(0));
+ }
+ }
+
+ @Test
+ public void testDataSourceConfigurationDriverAndUrl() throws Exception {
+ JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.ClientDriver",
+ "jdbc:derby://localhost:1527/target/beam");
+ try (Connection conn = config.getConnection()) {
+ assertTrue(conn.isValid(0));
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testRead() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ PCollection<KV<String, Integer>> output = pipeline.apply(
+ JdbcIO.<KV<String, Integer>>read()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withQuery("select name,id from BEAM")
+ .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
+ @Override
+ public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
+ KV<String, Integer> kv =
+ KV.of(resultSet.getString("name"), resultSet.getInt("id"));
+ return kv;
+ }
+ })
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+
+ PAssert.thatSingleton(
+ output.apply("Count All", Count.<KV<String, Integer>>globally()))
+ .isEqualTo(1000L);
+
+ PAssert.that(output
+ .apply("Count Scientist", Count.<String, Integer>perKey())
+ ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(element.getKey(), 100L, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWrite() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ ArrayList<KV<Integer, String>> data = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ KV<Integer, String> kv = KV.of(i, "Test");
+ data.add(kv);
+ }
+ pipeline.apply(Create.of(data))
+ .apply(JdbcIO.<KV<Integer, String>>write()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.ClientDriver",
+ "jdbc:derby://localhost:1527/target/beam"))
+ .withStatement("insert into BEAM values(?, ?)")
+ .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+ public void setParameters(KV<Integer, String> element, PreparedStatement statement)
+ throws Exception {
+ statement.setInt(1, element.getKey());
+ statement.setString(2, element.getValue());
+ }
+ }));
+
+ pipeline.run();
+
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery("select count(*) from BEAM")) {
+ resultSet.next();
+ int count = resultSet.getInt(1);
+
+ Assert.assertEquals(2000, count);
+ }
+ }
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWriteWithEmptyPCollection() throws Exception {
+ TestPipeline pipeline = TestPipeline.create();
+
+ pipeline.apply(Create.of(new ArrayList<KV<Integer, String>>()))
+ .apply(JdbcIO.<KV<Integer, String>>write()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.ClientDriver",
+ "jdbc:derby://localhost:1527/target/beam"))
+ .withStatement("insert into BEAM values(?, ?)")
+ .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+ public void setParameters(KV<Integer, String> element, PreparedStatement statement)
+ throws Exception {
+ statement.setInt(1, element.getKey());
+ statement.setString(2, element.getValue());
+ }
+ }));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index c4c32ed..82cf8d0 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -39,6 +39,7 @@
<module>kafka</module>
<module>kinesis</module>
<module>mongodb</module>
+ <module>jdbc</module>
</modules>
</project>
[2/2] incubator-beam git commit: [BEAM-244] This closes #942
Posted by jb...@apache.org.
[BEAM-244] This closes #942
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5c34365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5c34365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5c34365
Branch: refs/heads/master
Commit: c5c343659ea7a597b2b6a5fe7efcec001f17a8f9
Parents: bc80ee3 19fad18
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Sun Oct 2 10:37:27 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun Oct 2 10:37:27 2016 +0200
----------------------------------------------------------------------
sdks/java/io/jdbc/pom.xml | 138 ++++++
.../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 427 +++++++++++++++++++
.../apache/beam/sdk/io/jdbc/package-info.java | 22 +
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 236 ++++++++++
sdks/java/io/pom.xml | 1 +
5 files changed, 824 insertions(+)
----------------------------------------------------------------------