You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/13 20:50:38 UTC
[1/2] beam git commit: JdbcIOIT now uses writeThenRead style
Repository: beam
Updated Branches:
refs/heads/master 5f972e8b2 -> 5fd2c6e13
JdbcIOIT now uses writeThenRead style
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6201ed1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6201ed1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6201ed1
Branch: refs/heads/master
Commit: a6201ed1488d9ae95637002744bc316f72401e56
Parents: 5f972e8
Author: Stephen Sisk <si...@google.com>
Authored: Fri Jun 16 11:04:07 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 13:43:27 2017 -0700
----------------------------------------------------------------------
sdks/java/io/common/pom.xml | 10 +
.../org/apache/beam/sdk/io/common/TestRow.java | 114 +++++++++++
sdks/java/io/jdbc/pom.xml | 10 +-
.../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 203 ++++++++++---------
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 115 ++++++-----
.../beam/sdk/io/jdbc/JdbcTestDataSet.java | 130 ------------
.../apache/beam/sdk/io/jdbc/JdbcTestHelper.java | 81 ++++++++
7 files changed, 377 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index df0d94b..1a6f54b 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -38,5 +38,15 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
new file mode 100644
index 0000000..5f0a2fb
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Used to pass values around within test pipelines.
+ */
+@AutoValue
+public abstract class TestRow implements Serializable, Comparable<TestRow> {
+ /**
+ * Manually create a test row.
+ */
+ public static TestRow create(Integer id, String name) {
+ return new AutoValue_TestRow(id, name);
+ }
+
+ public abstract Integer id();
+ public abstract String name();
+
+ public int compareTo(TestRow other) {
+ return id().compareTo(other.id());
+ }
+
+ /**
+ * Creates a {@link org.apache.beam.sdk.io.common.TestRow} from the seed value.
+ */
+ public static TestRow fromSeed(Integer seed) {
+ return create(seed, getNameForSeed(seed));
+ }
+
+ /**
+ * Returns the name field value produced from the given seed.
+ */
+ public static String getNameForSeed(Integer seed) {
+ return "Testval" + seed;
+ }
+
+ /**
+ * Returns a range of {@link org.apache.beam.sdk.io.common.TestRow}s for seed values between
+ * rangeStart (inclusive) and rangeEnd (exclusive).
+ */
+ public static Iterable<TestRow> getExpectedValues(int rangeStart, int rangeEnd) {
+ List<TestRow> ret = new ArrayList<TestRow>(rangeEnd - rangeStart + 1);
+ for (int i = rangeStart; i < rangeEnd; i++) {
+ ret.add(fromSeed(i));
+ }
+ return ret;
+ }
+
+ /**
+ * Uses the input Long values as seeds to produce {@link org.apache.beam.sdk.io.common.TestRow}s.
+ */
+ public static class DeterministicallyConstructTestRowFn extends DoFn<Long, TestRow> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(fromSeed(c.element().intValue()));
+ }
+ }
+
+ /**
+ * Outputs just the name stored in the {@link org.apache.beam.sdk.io.common.TestRow}.
+ */
+ public static class SelectNameFn extends DoFn<TestRow, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().name());
+ }
+ }
+
+ /**
+ * Precalculated hashes - you can calculate an entry by running HashingFn on
+ * the name() for the rows generated from seeds in [0, n).
+ */
+ private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
+ 1000, "7d94d63a41164be058a9680002914358"
+ );
+
+ /**
+ * Returns the hash value that {@link org.apache.beam.sdk.io.common.HashingFn} will return when it
+ * is run on {@link org.apache.beam.sdk.io.common.TestRow}s produced by
+ * getExpectedValues(0, rowCount).
+ */
+ public static String getExpectedHashForRowCount(int rowCount)
+ throws UnsupportedOperationException {
+ String hash = EXPECTED_HASHES.get(rowCount);
+ if (hash == null) {
+ throw new UnsupportedOperationException("No hash for that row count");
+ }
+ return hash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 050fc6a..e5f4d7e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -105,11 +105,6 @@
<version>2.1.1</version>
</dependency>
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
<!-- compile dependencies -->
<dependency>
<groupId>com.google.auto.value</groupId>
@@ -168,5 +163,10 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index e8ffad6..32d6d9e 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -17,41 +17,39 @@
*/
package org.apache.beam.sdk.io.jdbc;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
+import java.text.ParseException;
import java.util.List;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance.
*
- * <p>This test requires a running instance of Postgres, and the test dataset must exist in the
- * database. `JdbcTestDataSet` will create the read table.
- *
- * <p>You can run this test by doing the following:
+ * <p>This test requires a running instance of Postgres. Pass in connection information using
+ * PipelineOptions:
* <pre>
* mvn -e -Pio-it verify -pl sdks/java/io/jdbc -DintegrationTestPipelineOptions='[
* "--postgresServerName=1.2.3.4",
@@ -67,112 +65,123 @@ import org.postgresql.ds.PGSimpleDataSource;
*/
@RunWith(JUnit4.class)
public class JdbcIOIT {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
+ public static final int EXPECTED_ROW_COUNT = 1000;
private static PGSimpleDataSource dataSource;
- private static String writeTableName;
+ private static String tableName;
+
+ @Rule
+ public TestPipeline pipelineWrite = TestPipeline.create();
+ @Rule
+ public TestPipeline pipelineRead = TestPipeline.create();
@BeforeClass
- public static void setup() throws SQLException {
+ public static void setup() throws SQLException, ParseException {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);
- // We do dataSource set up in BeforeClass rather than Before since we don't need to create a new
- // dataSource for each test.
- dataSource = JdbcTestDataSet.getDataSource(options);
- }
+ dataSource = getDataSource(options);
- @AfterClass
- public static void tearDown() throws SQLException {
- // Only do write table clean up once for the class since we don't want to clean up after both
- // read and write tests, only want to do it once after all the tests are done.
- JdbcTestDataSet.cleanUpDataTable(dataSource, writeTableName);
+ tableName = JdbcTestHelper.getTableName("IT");
+ JdbcTestHelper.createDataTable(dataSource, tableName);
}
- private static class CreateKVOfNameAndId implements JdbcIO.RowMapper<KV<String, Integer>> {
- @Override
- public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
- KV<String, Integer> kv =
- KV.of(resultSet.getString("name"), resultSet.getInt("id"));
- return kv;
- }
- }
+ private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
+ throws SQLException {
+ PGSimpleDataSource dataSource = new PGSimpleDataSource();
- private static class PutKeyInColumnOnePutValueInColumnTwo
- implements JdbcIO.PreparedStatementSetter<KV<Integer, String>> {
- @Override
- public void setParameters(KV<Integer, String> element, PreparedStatement statement)
- throws SQLException {
- statement.setInt(1, element.getKey());
- statement.setString(2, element.getValue());
- }
+ dataSource.setDatabaseName(options.getPostgresDatabaseName());
+ dataSource.setServerName(options.getPostgresServerName());
+ dataSource.setPortNumber(options.getPostgresPort());
+ dataSource.setUser(options.getPostgresUsername());
+ dataSource.setPassword(options.getPostgresPassword());
+ dataSource.setSsl(options.getPostgresSsl());
+
+ return dataSource;
}
- @Rule
- public TestPipeline pipeline = TestPipeline.create();
+ @AfterClass
+ public static void tearDown() throws SQLException {
+ JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+ }
/**
- * Does a test read of a few rows from a postgres database.
- *
- * <p>Note that IT read tests must not do any data table manipulation (setup/clean up.)
- * @throws SQLException
+ * Tests writing then reading data for a postgres database.
*/
@Test
- public void testRead() throws SQLException {
- String writeTableName = JdbcTestDataSet.READ_TABLE_NAME;
+ public void testWriteThenRead() {
+ runWrite();
+ runRead();
+ }
- PCollection<KV<String, Integer>> output = pipeline.apply(JdbcIO.<KV<String, Integer>>read()
+ /**
+ * Writes the test dataset to postgres.
+ *
+ * <p>This method does not attempt to validate the data - we do so in the read test. This does
+ * make it harder to tell whether a test failed in the write or read phase, but the tests are much
+ * easier to maintain (don't need any separate code to write test data for read tests to
+ * the database.)
+ */
+ private void runWrite() {
+ pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT))
+ .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
+ .apply(JdbcIO.<TestRow>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withQuery("select name,id from " + writeTableName)
- .withRowMapper(new CreateKVOfNameAndId())
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
- // TODO: validate actual contents of rows, not just count.
- PAssert.thatSingleton(
- output.apply("Count All", Count.<KV<String, Integer>>globally()))
- .isEqualTo(1000L);
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow()));
- List<KV<String, Long>> expectedCounts = new ArrayList<>();
- for (String scientist : JdbcTestDataSet.SCIENTISTS) {
- expectedCounts.add(KV.of(scientist, 100L));
- }
- PAssert.that(output.apply("Count Scientist", Count.<String, Integer>perKey()))
- .containsInAnyOrder(expectedCounts);
-
- pipeline.run().waitUntilFinish();
+ pipelineWrite.run().waitUntilFinish();
}
/**
- * Tests writes to a postgres database.
+ * Read the test dataset from postgres and validate its contents.
+ *
+ * <p>When doing the validation, we wish to ensure that we:
+ * 1. Ensure *all* the rows are correct
+ * 2. Provide enough information in assertions such that it is easy to spot obvious errors (e.g.
+ * all elements have a similar mistake, or "only 5 elements were generated" and the user wants
+ * to see what the problem was.
*
- * <p>Write Tests must clean up their data - in this case, it uses a new table every test run so
- * that it won't interfere with read tests/other write tests. It uses finally to attempt to
- * clean up data at the end of the test run.
- * @throws SQLException
+ * <p>We do not wish to generate and compare all of the expected values, so this method uses
+ * hashing to ensure that all expected data is present. However, hashing does not provide easy
+ * debugging information (failures like "every element was empty string" are hard to see),
+ * so we also:
+ * 1. Generate expected values for the first and last 500 rows
+ * 2. Use containsInAnyOrder to verify that their values are correct.
+ * Where first/last 500 rows is determined by the fact that we know all rows have a unique id - we
+ * can use the natural ordering of that key.
*/
- @Test
- public void testWrite() throws SQLException {
- writeTableName = JdbcTestDataSet.createWriteDataTable(dataSource);
-
- ArrayList<KV<Integer, String>> data = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- KV<Integer, String> kv = KV.of(i, "Test");
- data.add(kv);
- }
- pipeline.apply(Create.of(data))
- .apply(JdbcIO.<KV<Integer, String>>write()
- .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withStatement(String.format("insert into %s values(?, ?)", writeTableName))
- .withPreparedStatementSetter(new PutKeyInColumnOnePutValueInColumnTwo()));
-
- pipeline.run().waitUntilFinish();
-
- try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery("select count(*) from " + writeTableName)) {
- resultSet.next();
- int count = resultSet.getInt(1);
- Assert.assertEquals(2000, count);
- }
- // TODO: Actually verify contents of the rows.
+ private void runRead() {
+ PCollection<TestRow> namesAndIds =
+ pipelineRead.apply(JdbcIO.<TestRow>read()
+ .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withQuery(String.format("select name,id from %s;", tableName))
+ .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+ .withCoder(SerializableCoder.of(TestRow.class)));
+
+ PAssert.thatSingleton(
+ namesAndIds.apply("Count All", Count.<TestRow>globally()))
+ .isEqualTo((long) EXPECTED_ROW_COUNT);
+
+ PCollection<String> consolidatedHashcode = namesAndIds
+ .apply(ParDo.of(new TestRow.SelectNameFn()))
+ .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
+ PAssert.that(consolidatedHashcode)
+ .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT));
+
+ PCollection<List<TestRow>> frontOfList =
+ namesAndIds.apply(Top.<TestRow>smallest(500));
+ Iterable<TestRow> expectedFrontOfList = TestRow.getExpectedValues(0, 500);
+ PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList);
+
+ PCollection<List<TestRow>> backOfList =
+ namesAndIds.apply(Top.<TestRow>largest(500));
+ Iterable<TestRow> expectedBackOfList =
+ TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500,
+ EXPECTED_ROW_COUNT);
+ PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);
+
+ pipelineRead.run().waitUntilFinish();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 984ce1a..4ea18ef 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.jdbc;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.PrintWriter;
@@ -28,18 +27,22 @@ import java.net.ServerSocket;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import java.util.Collections;
+import javax.sql.DataSource;
+
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.derby.drda.NetworkServerControl;
@@ -58,11 +61,13 @@ import org.slf4j.LoggerFactory;
*/
public class JdbcIOTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
+ public static final int EXPECTED_ROW_COUNT = 1000;
private static NetworkServerControl derbyServer;
private static ClientDataSource dataSource;
private static int port;
+ private static String readTableName;
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -108,14 +113,16 @@ public class JdbcIOTest implements Serializable {
dataSource.setServerName("localhost");
dataSource.setPortNumber(port);
+ readTableName = JdbcTestHelper.getTableName("UT_READ");
- JdbcTestDataSet.createReadDataTable(dataSource);
+ JdbcTestHelper.createDataTable(dataSource, readTableName);
+ addInitialData(dataSource, readTableName);
}
@AfterClass
public static void shutDownDatabase() throws Exception {
try {
- JdbcTestDataSet.cleanUpDataTable(dataSource, JdbcTestDataSet.READ_TABLE_NAME);
+ JdbcTestHelper.cleanUpDataTable(dataSource, readTableName);
} finally {
if (derbyServer != null) {
derbyServer.shutdown();
@@ -177,39 +184,43 @@ public class JdbcIOTest implements Serializable {
}
}
+ /**
+ * Create test data that is consistent with that generated by TestRow.
+ */
+ private static void addInitialData(DataSource dataSource, String tableName)
+ throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(
+ String.format("insert into %s values (?,?)", tableName))) {
+ for (int i = 0; i < EXPECTED_ROW_COUNT; i++) {
+ preparedStatement.clearParameters();
+ preparedStatement.setInt(1, i);
+ preparedStatement.setString(2, TestRow.getNameForSeed(i));
+ preparedStatement.executeUpdate();
+ }
+ }
+ connection.commit();
+ }
+ }
+
@Test
@Category(NeedsRunner.class)
public void testRead() throws Exception {
-
- PCollection<KV<String, Integer>> output = pipeline.apply(
- JdbcIO.<KV<String, Integer>>read()
+ PCollection<TestRow> rows = pipeline.apply(
+ JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withQuery("select name,id from " + JdbcTestDataSet.READ_TABLE_NAME)
- .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
- @Override
- public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
- KV<String, Integer> kv =
- KV.of(resultSet.getString("name"), resultSet.getInt("id"));
- return kv;
- }
- })
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+ .withQuery("select name,id from " + readTableName)
+ .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+ .withCoder(SerializableCoder.of(TestRow.class)));
PAssert.thatSingleton(
- output.apply("Count All", Count.<KV<String, Integer>>globally()))
- .isEqualTo(1000L);
-
- PAssert.that(output
- .apply("Count Scientist", Count.<String, Integer>perKey())
- ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
- @Override
- public Void apply(Iterable<KV<String, Long>> input) {
- for (KV<String, Long> element : input) {
- assertEquals(element.getKey(), 100L, element.getValue().longValue());
- }
- return null;
- }
- });
+ rows.apply("Count All", Count.<TestRow>globally()))
+ .isEqualTo((long) EXPECTED_ROW_COUNT);
+
+ Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0, EXPECTED_ROW_COUNT);
+ PAssert.that(rows).containsInAnyOrder(expectedValues);
pipeline.run();
}
@@ -217,32 +228,27 @@ public class JdbcIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void testReadWithSingleStringParameter() throws Exception {
-
- PCollection<KV<String, Integer>> output = pipeline.apply(
- JdbcIO.<KV<String, Integer>>read()
+ PCollection<TestRow> rows = pipeline.apply(
+ JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery(String.format("select name,id from %s where name = ?",
- JdbcTestDataSet.READ_TABLE_NAME))
+ readTableName))
.withStatementPreparator(new JdbcIO.StatementPreparator() {
@Override
public void setParameters(PreparedStatement preparedStatement)
- throws Exception {
- preparedStatement.setString(1, "Darwin");
- }
- })
- .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
- @Override
- public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
- KV<String, Integer> kv =
- KV.of(resultSet.getString("name"), resultSet.getInt("id"));
- return kv;
+ throws Exception {
+ preparedStatement.setString(1, TestRow.getNameForSeed(1));
}
})
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+ .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+ .withCoder(SerializableCoder.of(TestRow.class)));
PAssert.thatSingleton(
- output.apply("Count One Scientist", Count.<KV<String, Integer>>globally()))
- .isEqualTo(100L);
+ rows.apply("Count All", Count.<TestRow>globally()))
+ .isEqualTo(1L);
+
+ Iterable<TestRow> expectedValues = Collections.singletonList(TestRow.fromSeed(1));
+ PAssert.that(rows).containsInAnyOrder(expectedValues);
pipeline.run();
}
@@ -250,11 +256,13 @@ public class JdbcIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void testWrite() throws Exception {
+ final long rowsToAdd = 1000L;
- String tableName = JdbcTestDataSet.createWriteDataTable(dataSource);
+ String tableName = JdbcTestHelper.getTableName("UT_WRITE");
+ JdbcTestHelper.createDataTable(dataSource, tableName);
try {
ArrayList<KV<Integer, String>> data = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < rowsToAdd; i++) {
KV<Integer, String> kv = KV.of(i, "Test");
data.add(kv);
}
@@ -282,19 +290,18 @@ public class JdbcIOTest implements Serializable {
resultSet.next();
int count = resultSet.getInt(1);
- Assert.assertEquals(2000, count);
+ Assert.assertEquals(EXPECTED_ROW_COUNT, count);
}
}
}
} finally {
- JdbcTestDataSet.cleanUpDataTable(dataSource, tableName);
+ JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
}
}
@Test
@Category(NeedsRunner.class)
public void testWriteWithEmptyPCollection() throws Exception {
-
pipeline
.apply(Create.empty(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())))
.apply(JdbcIO.<KV<Integer, String>>write()
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
deleted file mode 100644
index 0b88be2..0000000
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import javax.sql.DataSource;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.postgresql.ds.PGSimpleDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manipulates test data used by the {@link org.apache.beam.sdk.io.jdbc.JdbcIO} tests.
- *
- * <p>This is independent from the tests so that for read tests it can be run separately after data
- * store creation rather than every time (which can be more fragile.)
- */
-public class JdbcTestDataSet {
- private static final Logger LOG = LoggerFactory.getLogger(JdbcTestDataSet.class);
- public static final String[] SCIENTISTS = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie",
- "Faraday", "McClintock", "Herschel", "Hopper", "Lovelace"};
- /**
- * Use this to create the read tables before IT read tests.
- *
- * <p>To invoke this class, you can use this command line:
- * (run from the jdbc root directory)
- * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.jdbc.JdbcTestDataSet \
- * -Dexec.args="--postgresServerName=127.0.0.1 --postgresUsername=postgres \
- * --postgresDatabaseName=myfancydb \
- * --postgresPassword=yourpassword --postgresSsl=false" \
- * -Dexec.classpathScope=test
- * @param args Please pass options from IOTestPipelineOptions used for connection to postgres as
- * shown above.
- */
- public static void main(String[] args) throws SQLException {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options =
- PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-
- createReadDataTable(getDataSource(options));
- }
-
- public static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
- throws SQLException {
- PGSimpleDataSource dataSource = new PGSimpleDataSource();
-
- // Tests must receive parameters for connections from PipelineOptions
- // Parameters should be generic to all tests that use a particular datasource, not
- // the particular test.
- dataSource.setDatabaseName(options.getPostgresDatabaseName());
- dataSource.setServerName(options.getPostgresServerName());
- dataSource.setPortNumber(options.getPostgresPort());
- dataSource.setUser(options.getPostgresUsername());
- dataSource.setPassword(options.getPostgresPassword());
- dataSource.setSsl(options.getPostgresSsl());
-
- return dataSource;
- }
-
- public static final String READ_TABLE_NAME = "BEAM_TEST_READ";
-
- public static void createReadDataTable(DataSource dataSource) throws SQLException {
- createDataTable(dataSource, READ_TABLE_NAME);
- }
-
- public static String createWriteDataTable(DataSource dataSource) throws SQLException {
- String tableName = "BEAMTEST" + org.joda.time.Instant.now().getMillis();
- createDataTable(dataSource, tableName);
- return tableName;
- }
-
- private static void createDataTable(DataSource dataSource, String tableName) throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- // something like this will need to happen in tests on a newly created postgres server,
- // but likely it will happen in perfkit, not here
- // alternatively, we may have a pipelineoption indicating whether we want to
- // re-use the database or create a new one
- try (Statement statement = connection.createStatement()) {
- statement.execute(
- String.format("create table %s (id INT, name VARCHAR(500))", tableName));
- }
-
- connection.setAutoCommit(false);
- try (PreparedStatement preparedStatement =
- connection.prepareStatement(
- String.format("insert into %s values (?,?)", tableName))) {
- for (int i = 0; i < 1000; i++) {
- int index = i % SCIENTISTS.length;
- preparedStatement.clearParameters();
- preparedStatement.setInt(1, i);
- preparedStatement.setString(2, SCIENTISTS[index]);
- preparedStatement.executeUpdate();
- }
- }
- connection.commit();
- }
-
- LOG.info("Created table {}", tableName);
- }
-
- public static void cleanUpDataTable(DataSource dataSource, String tableName)
- throws SQLException {
- if (tableName != null) {
- try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
- statement.executeUpdate(String.format("drop table %s", tableName));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
new file mode 100644
index 0000000..fedae51
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.TestRow;
+
+/**
+ * Contains Test helper methods used by both Integration and Unit Tests in
+ * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+class JdbcTestHelper {
+ static String getTableName(String testIdentifier) throws ParseException {
+ SimpleDateFormat formatter = new SimpleDateFormat();
+ formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
+ return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
+ }
+
+ static void createDataTable(
+ DataSource dataSource, String tableName)
+ throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format("create table %s (id INT, name VARCHAR(500))", tableName));
+ }
+ }
+ }
+
+ static void cleanUpDataTable(DataSource dataSource, String tableName)
+ throws SQLException {
+ if (tableName != null) {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.executeUpdate(String.format("drop table %s", tableName));
+ }
+ }
+ }
+
+ static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
+ @Override
+ public TestRow mapRow(ResultSet resultSet) throws Exception {
+ return TestRow.create(
+ resultSet.getInt("id"), resultSet.getString("name"));
+ }
+ }
+
+ static class PrepareStatementFromTestRow
+ implements JdbcIO.PreparedStatementSetter<TestRow> {
+ @Override
+ public void setParameters(TestRow element, PreparedStatement statement)
+ throws SQLException {
+ statement.setLong(1, element.id());
+ statement.setString(2, element.name());
+ }
+ }
+
+}
[2/2] beam git commit: This closes #2507: JdbcIOIT now uses
writeThenRead style
Posted by ke...@apache.org.
This closes #2507: JdbcIOIT now uses writeThenRead style
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fd2c6e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fd2c6e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fd2c6e1
Branch: refs/heads/master
Commit: 5fd2c6e139387d3bf1a297adaf5dc4687bcda7ee
Parents: 5f972e8 a6201ed
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 13 13:43:36 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 13:43:36 2017 -0700
----------------------------------------------------------------------
sdks/java/io/common/pom.xml | 10 +
.../org/apache/beam/sdk/io/common/TestRow.java | 114 +++++++++++
sdks/java/io/jdbc/pom.xml | 10 +-
.../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 203 ++++++++++---------
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 115 ++++++-----
.../beam/sdk/io/jdbc/JdbcTestDataSet.java | 130 ------------
.../apache/beam/sdk/io/jdbc/JdbcTestHelper.java | 81 ++++++++
7 files changed, 377 insertions(+), 286 deletions(-)
----------------------------------------------------------------------