You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/01/18 18:50:23 UTC
[beam] branch master updated: [BEAM-3217] add HadoopInputFormatIO
integration test using DBInputFormat (#4332)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2a96c1c [BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat (#4332)
2a96c1c is described below
commit 2a96c1cfeee0fd375ebdba815ab2cc2657c014ec
Author: Ćukasz Gajowy <lu...@gmail.com>
AuthorDate: Thu Jan 18 19:50:19 2018 +0100
[BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat (#4332)
The kubernetes infrastructure that is needed for the jenkins job
to run is not available for now. We should add it once
the infrastructure is there.
---
sdks/java/io/common/pom.xml | 5 +
.../beam/sdk/io/common/DatabaseTestHelper.java} | 57 +++---
.../org/apache/beam/sdk/io/common/TestRow.java | 1 +
sdks/java/io/hadoop/input-format/pom.xml | 212 +++++++++++++++++++++
.../hadoop/inputformat/HadoopInputFormatIOIT.java | 147 ++++++++++++++
.../io/hadoop/inputformat/TestRowDBWritable.java | 83 ++++++++
sdks/java/io/jdbc/pom.xml | 1 -
.../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 27 +--
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 13 +-
.../apache/beam/sdk/io/jdbc/JdbcTestHelper.java | 35 +---
sdks/java/io/pom.xml | 11 ++
11 files changed, 500 insertions(+), 92 deletions(-)
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index eb79091..bd143d4 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -48,5 +48,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
similarity index 59%
copy from sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
copy to sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index fedae51..d69654a 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -15,32 +15,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.jdbc;
+package org.apache.beam.sdk.io.common;
import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.sql.DataSource;
-import org.apache.beam.sdk.io.common.TestRow;
+import org.postgresql.ds.PGSimpleDataSource;
/**
- * Contains Test helper methods used by both Integration and Unit Tests in
- * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ * This class contains helper methods to ease database usage in tests.
*/
-class JdbcTestHelper {
- static String getTableName(String testIdentifier) throws ParseException {
- SimpleDateFormat formatter = new SimpleDateFormat();
- formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
- return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
+public class DatabaseTestHelper {
+
+ public static PGSimpleDataSource getPostgresDataSource(IOTestPipelineOptions options) {
+ PGSimpleDataSource dataSource = new PGSimpleDataSource();
+ dataSource.setDatabaseName(options.getPostgresDatabaseName());
+ dataSource.setServerName(options.getPostgresServerName());
+ dataSource.setPortNumber(options.getPostgresPort());
+ dataSource.setUser(options.getPostgresUsername());
+ dataSource.setPassword(options.getPostgresPassword());
+ dataSource.setSsl(options.getPostgresSsl());
+ return dataSource;
}
- static void createDataTable(
- DataSource dataSource, String tableName)
+ public static void createTable(DataSource dataSource, String tableName)
throws SQLException {
try (Connection connection = dataSource.getConnection()) {
try (Statement statement = connection.createStatement()) {
@@ -50,7 +51,7 @@ class JdbcTestHelper {
}
}
- static void cleanUpDataTable(DataSource dataSource, String tableName)
+ public static void deleteTable(DataSource dataSource, String tableName)
throws SQLException {
if (tableName != null) {
try (Connection connection = dataSource.getConnection();
@@ -60,22 +61,18 @@ class JdbcTestHelper {
}
}
- static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
- @Override
- public TestRow mapRow(ResultSet resultSet) throws Exception {
- return TestRow.create(
- resultSet.getInt("id"), resultSet.getString("name"));
- }
+ public static String getTestTableName(String testIdentifier) {
+ SimpleDateFormat formatter = new SimpleDateFormat();
+ formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
+ return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
}
- static class PrepareStatementFromTestRow
- implements JdbcIO.PreparedStatementSetter<TestRow> {
- @Override
- public void setParameters(TestRow element, PreparedStatement statement)
- throws SQLException {
- statement.setLong(1, element.id());
- statement.setString(2, element.name());
- }
+ public static String getPostgresDBUrl(IOTestPipelineOptions options) {
+ return String.format(
+ "jdbc:postgresql://%s:%s/%s",
+ options.getPostgresServerName(),
+ options.getPostgresPort(),
+ options.getPostgresDatabaseName()
+ );
}
-
}
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index 79a144d..a276869 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -97,6 +97,7 @@ public abstract class TestRow implements Serializable, Comparable<TestRow> {
private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
1000, "7d94d63a41164be058a9680002914358",
100_000, "c7cbddb319209e200f1c5eebef8fe960",
+ 600_000, "e2add2f680de9024e9bc46cd3912545e",
5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
);
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index c698b40..931e11e 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -27,6 +27,197 @@
<name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format</name>
<description>IO to read data from data sources which implement Hadoop Input Format.</description>
+ <profiles>
+ <!-- Include the Google Cloud Dataflow runner activated by -DintegrationTestRunner=dataflow -->
+ <profile>
+ <id>dataflow-runner</id>
+ <activation>
+ <property>
+ <name>integrationTestRunner</name>
+ <value>dataflow</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!--
+ This profile invokes PerfKitBenchmarker, which does benchmarking of
+ the IO ITs. The arguments passed to it allow it to invoke mvn again
+ with the desired benchmark.
+
+ To invoke this, run:
+
+ mvn verify -Dio-it-suite -pl sdks/java/io/hadoop/input-format
+ -DpkbLocation="path-to-pkb.py" \
+ -DintegrationTestPipelineOptions='["-tempRoot=gs://bucket/staging"]'
+ -->
+ <profile>
+ <id>io-it-suite</id>
+ <activation>
+ <property><name>io-it-suite</name></property>
+ </activation>
+ <properties>
+ <!-- This is based on the location of the current pom relative to the root
+ See discussion in BEAM-2460 -->
+ <beamRootProjectDir>${project.parent.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <version>${groovy-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>find-supported-python-for-compile</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+ <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven-exec-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>${python.interpreter.bin}</executable>
+ <arguments>
+ <argument>${pkbLocation}</argument>
+ <argument>-benchmarks=beam_integration_benchmark</argument>
+ <argument>-beam_it_profile=io-it</argument>
+ <argument>-beam_location=${beamRootProjectDir}</argument>
+ <argument>-beam_prebuilt=true</argument>
+ <argument>-beam_sdk=java</argument>
+ <argument>-kubeconfig=${kubeconfig}</argument>
+ <argument>-kubectl=${kubectl}</argument>
+ <!-- runner overrides, controlled via forceDirectRunner -->
+ <argument>${pkbBeamRunnerProfile}</argument>
+ <argument>${pkbBeamRunnerOption}</argument>
+ <!-- specific to this IO -->
+ <argument>-beam_it_module=sdks/java/io/hadoop/input-format</argument>
+ <argument>-beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT</argument>
+ <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config.yml</argument>
+ <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml</argument>
+ <!-- arguments typically defined by user -->
+ <argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire-plugin.version}</version>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!--
+ io-it-suite-local overrides part of io-it-suite, allowing users to run tests
+ when they are on a separate network from the kubernetes cluster by
+ creating a LoadBalancer service.
+ -->
+ <profile>
+ <id>io-it-suite-local</id>
+ <activation><property><name>io-it-suite-local</name></property></activation>
+ <properties>
+ <!-- This is based on the location of the current pom relative to the root
+ See discussion in BEAM-2460 -->
+ <beamRootProjectDir>${project.parent.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <version>${groovy-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>find-supported-python-for-compile</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+ <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven-exec-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>${python.interpreter.bin}</executable>
+ <arguments>
+ <argument>${pkbLocation}</argument>
+ <argument>-benchmarks=beam_integration_benchmark</argument>
+ <argument>-beam_it_profile=io-it</argument>
+ <argument>-beam_location=${beamRootProjectDir}</argument>
+ <argument>-beam_prebuilt=true</argument>
+ <argument>-beam_sdk=java</argument>
+ <argument>-kubeconfig=${kubeconfig}</argument>
+ <argument>-kubectl=${kubectl}</argument>
+ <!-- runner overrides, controlled via forceDirectRunner -->
+ <argument>${pkbBeamRunnerProfile}</argument>
+ <argument>${pkbBeamRunnerOption}</argument>
+ <!-- specific to this IO -->
+ <argument>-beam_it_module=sdks/java/io/hadoop/input-format</argument>
+ <argument>-beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT</argument>
+ <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config-local.yml</argument>
+ <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml,${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml</argument>
+ <!-- arguments typically defined by user -->
+ <argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire-plugin.version}</version>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
@@ -70,9 +261,30 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
new file mode 100644
index 0000000..397aa3a
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import static org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn;
+import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable.PrepareStatementFromTestRow;
+
+import java.sql.SQLException;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO}
+ * on an independent postgres instance.
+ *
+ * <p>This test requires a running instance of Postgres. Pass in connection information using
+ * PipelineOptions:
+ * <pre>
+ * mvn -e -Pio-it verify -pl sdks/java/io/hadoop/input-format/ -DintegrationTestPipelineOptions='[
+ * "--postgresServerName=1.2.3.4",
+ * "--postgresUsername=postgres",
+ * "--postgresDatabaseName=myfancydb",
+ * "--postgresPassword=mypass",
+ * "--postgresSsl=false",
+ * "--numberOfRecords=1000" ]'
+ * </pre>
+ */
+public class HadoopInputFormatIOIT {
+
+ private static PGSimpleDataSource dataSource;
+ private static Integer numberOfRows;
+ private static String tableName;
+ private static SerializableConfiguration hadoopConfiguration;
+
+ @Rule
+ public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline readPipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setUp() throws SQLException {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+ .as(IOTestPipelineOptions.class);
+
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ numberOfRows = options.getNumberOfRecords();
+ tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT");
+
+ DatabaseTestHelper.createTable(dataSource, tableName);
+ setupHadoopConfiguration(options);
+ }
+
+ private static void setupHadoopConfiguration(IOTestPipelineOptions options) {
+ Configuration conf = new Configuration();
+ DBConfiguration.configureDB(
+ conf,
+ "org.postgresql.Driver",
+ DatabaseTestHelper.getPostgresDBUrl(options),
+ options.getPostgresUsername(),
+ options.getPostgresPassword()
+ );
+ conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+ conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class);
+
+ conf.setClass("key.class", LongWritable.class, Object.class);
+ conf.setClass("value.class", TestRowDBWritable.class, Object.class);
+ conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
+
+ hadoopConfiguration = new SerializableConfiguration(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws SQLException {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+
+ @Test
+ public void readUsingHadoopInputFormat() {
+ writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+ .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn()))
+ .apply("Prevent fusion before writing", Reshuffle.<TestRow>viaRandomKey())
+ .apply("Write using JDBCIO", JdbcIO.<TestRow>write()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withPreparedStatementSetter(new PrepareStatementFromTestRow()));
+
+ writePipeline.run().waitUntilFinish();
+
+ PCollection<String> consolidatedHashcode = readPipeline
+ .apply("Read using HadoopInputFormat", HadoopInputFormatIO
+ .<LongWritable, TestRowDBWritable>read()
+ .withConfiguration(hadoopConfiguration.get()))
+ .apply("Get values only", Values.<TestRowDBWritable>create())
+ .apply("Values as string", ParDo.of(new SelectNameFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ PAssert.thatSingleton(consolidatedHashcode)
+ .isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+ readPipeline.run().waitUntilFinish();
+ }
+}
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
new file mode 100644
index 0000000..c9b2639
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * A subclass of {@link org.apache.beam.sdk.io.common.TestRow} to be used with
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
+ */
+@DefaultCoder(AvroCoder.class)
+public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+
+ private Integer id;
+ private String name;
+
+ @Override public Integer id() {
+ return id;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ statement.setInt(1, id);
+ statement.setString(2, name);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ id = resultSet.getInt(1);
+ name = resultSet.getString(2);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeChars(name);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ name = in.readUTF();
+ }
+
+ static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow> {
+ @Override
+ public void setParameters(TestRow element, PreparedStatement statement)
+ throws SQLException {
+ statement.setLong(1, element.id());
+ statement.setString(2, element.name());
+ }
+ }
+}
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 73fbc52..6264e9e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -328,7 +328,6 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
- <version>9.4.1212.jre7</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 941a775..ed169c7 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -18,11 +18,10 @@
package org.apache.beam.sdk.io.jdbc;
import java.sql.SQLException;
-import java.text.ParseException;
import java.util.List;
-
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
@@ -74,34 +73,20 @@ public class JdbcIOIT {
public TestPipeline pipelineRead = TestPipeline.create();
@BeforeClass
- public static void setup() throws SQLException, ParseException {
+ public static void setup() throws SQLException {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);
numberOfRows = options.getNumberOfRecords();
- dataSource = getDataSource(options);
-
- tableName = JdbcTestHelper.getTableName("IT");
- JdbcTestHelper.createDataTable(dataSource, tableName);
- }
-
- private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) {
- PGSimpleDataSource dataSource = new PGSimpleDataSource();
-
- dataSource.setDatabaseName(options.getPostgresDatabaseName());
- dataSource.setServerName(options.getPostgresServerName());
- dataSource.setPortNumber(options.getPostgresPort());
- dataSource.setUser(options.getPostgresUsername());
- dataSource.setPassword(options.getPostgresPassword());
- dataSource.setSsl(options.getPostgresSsl());
-
- return dataSource;
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ tableName = DatabaseTestHelper.getTestTableName("IT");
+ DatabaseTestHelper.createTable(dataSource, tableName);
}
@AfterClass
public static void tearDown() throws SQLException {
- JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
}
/**
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index f35c8b1..4871f20 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -111,16 +112,16 @@ public class JdbcIOTest implements Serializable {
dataSource.setServerName("localhost");
dataSource.setPortNumber(port);
- readTableName = JdbcTestHelper.getTableName("UT_READ");
+ readTableName = DatabaseTestHelper.getTestTableName("UT_READ");
- JdbcTestHelper.createDataTable(dataSource, readTableName);
+ DatabaseTestHelper.createTable(dataSource, readTableName);
addInitialData(dataSource, readTableName);
}
@AfterClass
public static void shutDownDatabase() throws Exception {
try {
- JdbcTestHelper.cleanUpDataTable(dataSource, readTableName);
+ DatabaseTestHelper.deleteTable(dataSource, readTableName);
} finally {
if (derbyServer != null) {
derbyServer.shutdown();
@@ -253,8 +254,8 @@ public class JdbcIOTest implements Serializable {
public void testWrite() throws Exception {
final long rowsToAdd = 1000L;
- String tableName = JdbcTestHelper.getTableName("UT_WRITE");
- JdbcTestHelper.createDataTable(dataSource, tableName);
+ String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ DatabaseTestHelper.createTable(dataSource, tableName);
try {
ArrayList<KV<Integer, String>> data = new ArrayList<>();
for (int i = 0; i < rowsToAdd; i++) {
@@ -290,7 +291,7 @@ public class JdbcIOTest implements Serializable {
}
}
} finally {
- JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
}
}
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
index fedae51..1824989 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
@@ -17,15 +17,9 @@
*/
package org.apache.beam.sdk.io.jdbc;
-import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.TestRow;
/**
@@ -33,32 +27,6 @@ import org.apache.beam.sdk.io.common.TestRow;
* {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
*/
class JdbcTestHelper {
- static String getTableName(String testIdentifier) throws ParseException {
- SimpleDateFormat formatter = new SimpleDateFormat();
- formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
- return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
- }
-
- static void createDataTable(
- DataSource dataSource, String tableName)
- throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("create table %s (id INT, name VARCHAR(500))", tableName));
- }
- }
- }
-
- static void cleanUpDataTable(DataSource dataSource, String tableName)
- throws SQLException {
- if (tableName != null) {
- try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
- statement.executeUpdate(String.format("drop table %s", tableName));
- }
- }
- }
static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
@Override
@@ -68,8 +36,7 @@ class JdbcTestHelper {
}
}
- static class PrepareStatementFromTestRow
- implements JdbcIO.PreparedStatementSetter<TestRow> {
+ static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow> {
@Override
public void setParameters(TestRow element, PreparedStatement statement)
throws SQLException {
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 0710df0..d643775 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -127,4 +127,15 @@
</properties>
</profile>
</profiles>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>9.4.1212.jre7</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</project>
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].