You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/13 20:50:38 UTC

[1/2] beam git commit: JdbcIOIT now uses writeThenRead style

Repository: beam
Updated Branches:
  refs/heads/master 5f972e8b2 -> 5fd2c6e13


JdbcIOIT now uses writeThenRead style


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6201ed1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6201ed1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6201ed1

Branch: refs/heads/master
Commit: a6201ed1488d9ae95637002744bc316f72401e56
Parents: 5f972e8
Author: Stephen Sisk <si...@google.com>
Authored: Fri Jun 16 11:04:07 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 13:43:27 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/common/pom.xml                     |  10 +
 .../org/apache/beam/sdk/io/common/TestRow.java  | 114 +++++++++++
 sdks/java/io/jdbc/pom.xml                       |  10 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java   | 203 ++++++++++---------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 115 ++++++-----
 .../beam/sdk/io/jdbc/JdbcTestDataSet.java       | 130 ------------
 .../apache/beam/sdk/io/jdbc/JdbcTestHelper.java |  81 ++++++++
 7 files changed, 377 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index df0d94b..1a6f54b 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -38,5 +38,15 @@
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
       </dependency>
+      <dependency>
+        <groupId>com.google.auto.value</groupId>
+        <artifactId>auto-value</artifactId>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
new file mode 100644
index 0000000..5f0a2fb
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Used to pass values around within test pipelines.
+ */
+@AutoValue
+public abstract class TestRow implements Serializable, Comparable<TestRow> {
+  /**
+   * Manually create a test row.
+   */
+  public static TestRow create(Integer id, String name) {
+    return new AutoValue_TestRow(id, name);
+  }
+
+  public abstract Integer id();
+  public abstract String name();
+
+  public int compareTo(TestRow other) {
+    return id().compareTo(other.id());
+  }
+
+  /**
+   * Creates a {@link org.apache.beam.sdk.io.common.TestRow} from the seed value.
+   */
+  public static TestRow fromSeed(Integer seed) {
+    return create(seed, getNameForSeed(seed));
+  }
+
+  /**
+   * Returns the name field value produced from the given seed.
+   */
+  public static String getNameForSeed(Integer seed) {
+    return "Testval" + seed;
+  }
+
+  /**
+   * Returns a range of {@link org.apache.beam.sdk.io.common.TestRow}s for seed values between
+   * rangeStart (inclusive) and rangeEnd (exclusive).
+   */
+  public static Iterable<TestRow> getExpectedValues(int rangeStart, int rangeEnd) {
+    List<TestRow> ret = new ArrayList<TestRow>(rangeEnd - rangeStart + 1);
+    for (int i = rangeStart; i < rangeEnd; i++) {
+      ret.add(fromSeed(i));
+    }
+    return ret;
+  }
+
+  /**
+   * Uses the input Long values as seeds to produce {@link org.apache.beam.sdk.io.common.TestRow}s.
+   */
+  public static class DeterministicallyConstructTestRowFn extends DoFn<Long, TestRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(fromSeed(c.element().intValue()));
+    }
+  }
+
+  /**
+   * Outputs just the name stored in the {@link org.apache.beam.sdk.io.common.TestRow}.
+   */
+  public static class SelectNameFn extends DoFn<TestRow, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().name());
+    }
+  }
+
+  /**
+   * Precalculated hashes - you can calculate an entry by running HashingFn on
+   * the name() for the rows generated from seeds in [0, n).
+   */
+  private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
+      1000, "7d94d63a41164be058a9680002914358"
+  );
+
+  /**
+   * Returns the hash value that {@link org.apache.beam.sdk.io.common.HashingFn} will return when it
+   * is run on {@link org.apache.beam.sdk.io.common.TestRow}s produced by
+   * getExpectedValues(0, rowCount).
+   */
+  public static String getExpectedHashForRowCount(int rowCount)
+      throws UnsupportedOperationException {
+    String hash = EXPECTED_HASHES.get(rowCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException("No hash for that row count");
+    }
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 050fc6a..e5f4d7e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -105,11 +105,6 @@
       <version>2.1.1</version>
     </dependency>
 
-    <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
     <!-- compile dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>
@@ -168,5 +163,10 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index e8ffad6..32d6d9e 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -17,41 +17,39 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
+import java.text.ParseException;
 import java.util.List;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance.
  *
- * <p>This test requires a running instance of Postgres, and the test dataset must exist in the
- * database. `JdbcTestDataSet` will create the read table.
- *
- * <p>You can run this test by doing the following:
+ * <p>This test requires a running instance of Postgres. Pass in connection information using
+ * PipelineOptions:
  * <pre>
  *  mvn -e -Pio-it verify -pl sdks/java/io/jdbc -DintegrationTestPipelineOptions='[
  *  "--postgresServerName=1.2.3.4",
@@ -67,112 +65,123 @@ import org.postgresql.ds.PGSimpleDataSource;
  */
 @RunWith(JUnit4.class)
 public class JdbcIOIT {
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
+  public static final int EXPECTED_ROW_COUNT = 1000;
   private static PGSimpleDataSource dataSource;
-  private static String writeTableName;
+  private static String tableName;
+
+  @Rule
+  public TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule
+  public TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
-  public static void setup() throws SQLException {
+  public static void setup() throws SQLException, ParseException {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
         .as(IOTestPipelineOptions.class);
 
-    // We do dataSource set up in BeforeClass rather than Before since we don't need to create a new
-    // dataSource for each test.
-    dataSource = JdbcTestDataSet.getDataSource(options);
-  }
+    dataSource = getDataSource(options);
 
-  @AfterClass
-  public static void tearDown() throws SQLException {
-    // Only do write table clean up once for the class since we don't want to clean up after both
-    // read and write tests, only want to do it once after all the tests are done.
-    JdbcTestDataSet.cleanUpDataTable(dataSource, writeTableName);
+    tableName = JdbcTestHelper.getTableName("IT");
+    JdbcTestHelper.createDataTable(dataSource, tableName);
   }
 
-  private static class CreateKVOfNameAndId implements JdbcIO.RowMapper<KV<String, Integer>> {
-    @Override
-    public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
-      KV<String, Integer> kv =
-          KV.of(resultSet.getString("name"), resultSet.getInt("id"));
-      return kv;
-    }
-  }
+  private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
+      throws SQLException {
+    PGSimpleDataSource dataSource = new PGSimpleDataSource();
 
-  private static class PutKeyInColumnOnePutValueInColumnTwo
-      implements JdbcIO.PreparedStatementSetter<KV<Integer, String>> {
-    @Override
-    public void setParameters(KV<Integer, String> element, PreparedStatement statement)
-                    throws SQLException {
-      statement.setInt(1, element.getKey());
-      statement.setString(2, element.getValue());
-    }
+    dataSource.setDatabaseName(options.getPostgresDatabaseName());
+    dataSource.setServerName(options.getPostgresServerName());
+    dataSource.setPortNumber(options.getPostgresPort());
+    dataSource.setUser(options.getPostgresUsername());
+    dataSource.setPassword(options.getPostgresPassword());
+    dataSource.setSsl(options.getPostgresSsl());
+
+    return dataSource;
   }
 
-  @Rule
-  public TestPipeline pipeline = TestPipeline.create();
+  @AfterClass
+  public static void tearDown() throws SQLException {
+    JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+  }
 
   /**
-   * Does a test read of a few rows from a postgres database.
-   *
-   * <p>Note that IT read tests must not do any data table manipulation (setup/clean up.)
-   * @throws SQLException
+   * Tests writing then reading data for a postgres database.
    */
   @Test
-  public void testRead() throws SQLException {
-    String writeTableName = JdbcTestDataSet.READ_TABLE_NAME;
+  public void testWriteThenRead() {
+    runWrite();
+    runRead();
+  }
 
-    PCollection<KV<String, Integer>> output = pipeline.apply(JdbcIO.<KV<String, Integer>>read()
+  /**
+   * Writes the test dataset to postgres.
+   *
+   * <p>This method does not attempt to validate the data - we do so in the read test. This does
+   * make it harder to tell whether a test failed in the write or read phase, but the tests are much
+   * easier to maintain (don't need any separate code to write test data for read tests to
+   * the database.)
+   */
+  private void runWrite() {
+    pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT))
+        .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
+        .apply(JdbcIO.<TestRow>write()
             .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-            .withQuery("select name,id from " + writeTableName)
-            .withRowMapper(new CreateKVOfNameAndId())
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
-    // TODO: validate actual contents of rows, not just count.
-    PAssert.thatSingleton(
-        output.apply("Count All", Count.<KV<String, Integer>>globally()))
-        .isEqualTo(1000L);
+            .withStatement(String.format("insert into %s values(?, ?)", tableName))
+            .withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow()));
 
-    List<KV<String, Long>> expectedCounts = new ArrayList<>();
-    for (String scientist : JdbcTestDataSet.SCIENTISTS) {
-      expectedCounts.add(KV.of(scientist, 100L));
-    }
-    PAssert.that(output.apply("Count Scientist", Count.<String, Integer>perKey()))
-        .containsInAnyOrder(expectedCounts);
-
-    pipeline.run().waitUntilFinish();
+    pipelineWrite.run().waitUntilFinish();
   }
 
   /**
-   * Tests writes to a postgres database.
+   * Read the test dataset from postgres and validate its contents.
+   *
+   * <p>When doing the validation, we wish to ensure that we:
+   * 1. Ensure *all* the rows are correct
+   * 2. Provide enough information in assertions such that it is easy to spot obvious errors (e.g.
+   *    all elements have a similar mistake, or "only 5 elements were generated" and the user wants
+   *    to see what the problem was.
    *
-   * <p>Write Tests must clean up their data - in this case, it uses a new table every test run so
-   * that it won't interfere with read tests/other write tests. It uses finally to attempt to
-   * clean up data at the end of the test run.
-   * @throws SQLException
+   * <p>We do not wish to generate and compare all of the expected values, so this method uses
+   * hashing to ensure that all expected data is present. However, hashing does not provide easy
+   * debugging information (failures like "every element was empty string" are hard to see),
+   * so we also:
+   * 1. Generate expected values for the first and last 500 rows
+   * 2. Use containsInAnyOrder to verify that their values are correct.
+   * Where first/last 500 rows is determined by the fact that we know all rows have a unique id - we
+   * can use the natural ordering of that key.
    */
-  @Test
-  public void testWrite() throws SQLException {
-    writeTableName = JdbcTestDataSet.createWriteDataTable(dataSource);
-
-    ArrayList<KV<Integer, String>> data = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      KV<Integer, String> kv = KV.of(i, "Test");
-      data.add(kv);
-    }
-    pipeline.apply(Create.of(data))
-        .apply(JdbcIO.<KV<Integer, String>>write()
-            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-            .withStatement(String.format("insert into %s values(?, ?)", writeTableName))
-            .withPreparedStatementSetter(new PutKeyInColumnOnePutValueInColumnTwo()));
-
-    pipeline.run().waitUntilFinish();
-
-    try (Connection connection = dataSource.getConnection();
-         Statement statement = connection.createStatement();
-         ResultSet resultSet = statement.executeQuery("select count(*) from " + writeTableName)) {
-      resultSet.next();
-      int count = resultSet.getInt(1);
-      Assert.assertEquals(2000, count);
-    }
-    // TODO: Actually verify contents of the rows.
+  private void runRead() {
+    PCollection<TestRow> namesAndIds =
+        pipelineRead.apply(JdbcIO.<TestRow>read()
+        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+        .withQuery(String.format("select name,id from %s;", tableName))
+        .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+        .withCoder(SerializableCoder.of(TestRow.class)));
+
+    PAssert.thatSingleton(
+        namesAndIds.apply("Count All", Count.<TestRow>globally()))
+        .isEqualTo((long) EXPECTED_ROW_COUNT);
+
+    PCollection<String> consolidatedHashcode = namesAndIds
+        .apply(ParDo.of(new TestRow.SelectNameFn()))
+        .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
+    PAssert.that(consolidatedHashcode)
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT));
+
+    PCollection<List<TestRow>> frontOfList =
+        namesAndIds.apply(Top.<TestRow>smallest(500));
+    Iterable<TestRow> expectedFrontOfList = TestRow.getExpectedValues(0, 500);
+    PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList);
+
+    PCollection<List<TestRow>> backOfList =
+        namesAndIds.apply(Top.<TestRow>largest(500));
+    Iterable<TestRow> expectedBackOfList =
+        TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500,
+            EXPECTED_ROW_COUNT);
+    PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);
+
+    pipelineRead.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 984ce1a..4ea18ef 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.PrintWriter;
@@ -28,18 +27,22 @@ import java.net.ServerSocket;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import java.util.Collections;
+import javax.sql.DataSource;
+
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.common.TestRow;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.derby.drda.NetworkServerControl;
@@ -58,11 +61,13 @@ import org.slf4j.LoggerFactory;
  */
 public class JdbcIOTest implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
+  public static final int EXPECTED_ROW_COUNT = 1000;
 
   private static NetworkServerControl derbyServer;
   private static ClientDataSource dataSource;
 
   private static int port;
+  private static String readTableName;
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
@@ -108,14 +113,16 @@ public class JdbcIOTest implements Serializable {
     dataSource.setServerName("localhost");
     dataSource.setPortNumber(port);
 
+    readTableName = JdbcTestHelper.getTableName("UT_READ");
 
-    JdbcTestDataSet.createReadDataTable(dataSource);
+    JdbcTestHelper.createDataTable(dataSource, readTableName);
+    addInitialData(dataSource, readTableName);
   }
 
   @AfterClass
   public static void shutDownDatabase() throws Exception {
     try {
-      JdbcTestDataSet.cleanUpDataTable(dataSource, JdbcTestDataSet.READ_TABLE_NAME);
+      JdbcTestHelper.cleanUpDataTable(dataSource, readTableName);
     } finally {
       if (derbyServer != null) {
         derbyServer.shutdown();
@@ -177,39 +184,43 @@ public class JdbcIOTest implements Serializable {
     }
   }
 
+  /**
+   * Create test data that is consistent with that generated by TestRow.
+   */
+  private static void addInitialData(DataSource dataSource, String tableName)
+      throws SQLException {
+    try (Connection connection = dataSource.getConnection()) {
+      connection.setAutoCommit(false);
+      try (PreparedStatement preparedStatement =
+               connection.prepareStatement(
+                   String.format("insert into %s values (?,?)", tableName))) {
+        for (int i = 0; i < EXPECTED_ROW_COUNT; i++) {
+          preparedStatement.clearParameters();
+          preparedStatement.setInt(1, i);
+          preparedStatement.setString(2, TestRow.getNameForSeed(i));
+          preparedStatement.executeUpdate();
+        }
+      }
+      connection.commit();
+    }
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testRead() throws Exception {
-
-    PCollection<KV<String, Integer>> output = pipeline.apply(
-        JdbcIO.<KV<String, Integer>>read()
+    PCollection<TestRow> rows = pipeline.apply(
+        JdbcIO.<TestRow>read()
             .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-            .withQuery("select name,id from " + JdbcTestDataSet.READ_TABLE_NAME)
-            .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
-              @Override
-              public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
-                  KV<String, Integer> kv =
-                      KV.of(resultSet.getString("name"), resultSet.getInt("id"));
-                  return kv;
-              }
-            })
-            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+            .withQuery("select name,id from " + readTableName)
+            .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+            .withCoder(SerializableCoder.of(TestRow.class)));
 
     PAssert.thatSingleton(
-        output.apply("Count All", Count.<KV<String, Integer>>globally()))
-        .isEqualTo(1000L);
-
-    PAssert.that(output
-        .apply("Count Scientist", Count.<String, Integer>perKey())
-    ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
-      @Override
-      public Void apply(Iterable<KV<String, Long>> input) {
-        for (KV<String, Long> element : input) {
-          assertEquals(element.getKey(), 100L, element.getValue().longValue());
-        }
-        return null;
-      }
-    });
+        rows.apply("Count All", Count.<TestRow>globally()))
+        .isEqualTo((long) EXPECTED_ROW_COUNT);
+
+    Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0, EXPECTED_ROW_COUNT);
+    PAssert.that(rows).containsInAnyOrder(expectedValues);
 
     pipeline.run();
   }
@@ -217,32 +228,27 @@ public class JdbcIOTest implements Serializable {
    @Test
    @Category(NeedsRunner.class)
    public void testReadWithSingleStringParameter() throws Exception {
-
-     PCollection<KV<String, Integer>> output = pipeline.apply(
-             JdbcIO.<KV<String, Integer>>read()
+     PCollection<TestRow> rows = pipeline.apply(
+             JdbcIO.<TestRow>read()
                      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
                      .withQuery(String.format("select name,id from %s where name = ?",
-                         JdbcTestDataSet.READ_TABLE_NAME))
+                         readTableName))
                      .withStatementPreparator(new JdbcIO.StatementPreparator() {
                        @Override
                        public void setParameters(PreparedStatement preparedStatement)
-                               throws Exception {
-                         preparedStatement.setString(1, "Darwin");
-                       }
-                     })
-                     .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
-                       @Override
-                       public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
-                         KV<String, Integer> kv =
-                                 KV.of(resultSet.getString("name"), resultSet.getInt("id"));
-                         return kv;
+                           throws Exception {
+                         preparedStatement.setString(1, TestRow.getNameForSeed(1));
                        }
                      })
-                     .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+                     .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+                 .withCoder(SerializableCoder.of(TestRow.class)));
 
      PAssert.thatSingleton(
-             output.apply("Count One Scientist", Count.<KV<String, Integer>>globally()))
-             .isEqualTo(100L);
+         rows.apply("Count All", Count.<TestRow>globally()))
+         .isEqualTo(1L);
+
+     Iterable<TestRow> expectedValues = Collections.singletonList(TestRow.fromSeed(1));
+     PAssert.that(rows).containsInAnyOrder(expectedValues);
 
      pipeline.run();
    }
@@ -250,11 +256,13 @@ public class JdbcIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWrite() throws Exception {
+    final long rowsToAdd = 1000L;
 
-    String tableName = JdbcTestDataSet.createWriteDataTable(dataSource);
+    String tableName = JdbcTestHelper.getTableName("UT_WRITE");
+    JdbcTestHelper.createDataTable(dataSource, tableName);
     try {
       ArrayList<KV<Integer, String>> data = new ArrayList<>();
-      for (int i = 0; i < 1000; i++) {
+      for (int i = 0; i < rowsToAdd; i++) {
         KV<Integer, String> kv = KV.of(i, "Test");
         data.add(kv);
       }
@@ -282,19 +290,18 @@ public class JdbcIOTest implements Serializable {
             resultSet.next();
             int count = resultSet.getInt(1);
 
-            Assert.assertEquals(2000, count);
+            Assert.assertEquals(EXPECTED_ROW_COUNT, count);
           }
         }
       }
     } finally {
-      JdbcTestDataSet.cleanUpDataTable(dataSource, tableName);
+      JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
     }
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws Exception {
-
     pipeline
         .apply(Create.empty(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())))
         .apply(JdbcIO.<KV<Integer, String>>write()

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
deleted file mode 100644
index 0b88be2..0000000
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import javax.sql.DataSource;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.postgresql.ds.PGSimpleDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manipulates test data used by the {@link org.apache.beam.sdk.io.jdbc.JdbcIO} tests.
- *
- * <p>This is independent from the tests so that for read tests it can be run separately after data
- * store creation rather than every time (which can be more fragile.)
- */
-public class JdbcTestDataSet {
-  private static final Logger LOG = LoggerFactory.getLogger(JdbcTestDataSet.class);
-  public static final String[] SCIENTISTS = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie",
-      "Faraday", "McClintock", "Herschel", "Hopper", "Lovelace"};
-  /**
-   * Use this to create the read tables before IT read tests.
-   *
-   * <p>To invoke this class, you can use this command line:
-   * (run from the jdbc root directory)
-   * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.jdbc.JdbcTestDataSet \
-   *   -Dexec.args="--postgresServerName=127.0.0.1 --postgresUsername=postgres \
-   *   --postgresDatabaseName=myfancydb \
-   *   --postgresPassword=yourpassword --postgresSsl=false" \
-   *   -Dexec.classpathScope=test
-   * @param args Please pass options from IOTestPipelineOptions used for connection to postgres as
-   * shown above.
-   */
-  public static void main(String[] args) throws SQLException {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    IOTestPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-
-    createReadDataTable(getDataSource(options));
-  }
-
-  public static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
-      throws SQLException {
-    PGSimpleDataSource dataSource = new PGSimpleDataSource();
-
-    // Tests must receive parameters for connections from PipelineOptions
-    // Parameters should be generic to all tests that use a particular datasource, not
-    // the particular test.
-    dataSource.setDatabaseName(options.getPostgresDatabaseName());
-    dataSource.setServerName(options.getPostgresServerName());
-    dataSource.setPortNumber(options.getPostgresPort());
-    dataSource.setUser(options.getPostgresUsername());
-    dataSource.setPassword(options.getPostgresPassword());
-    dataSource.setSsl(options.getPostgresSsl());
-
-    return dataSource;
-  }
-
-  public static final String READ_TABLE_NAME = "BEAM_TEST_READ";
-
-  public static void createReadDataTable(DataSource dataSource) throws SQLException {
-    createDataTable(dataSource, READ_TABLE_NAME);
-  }
-
-  public static String createWriteDataTable(DataSource dataSource) throws SQLException {
-    String tableName = "BEAMTEST" + org.joda.time.Instant.now().getMillis();
-    createDataTable(dataSource, tableName);
-    return tableName;
-  }
-
-  private static void createDataTable(DataSource dataSource, String tableName) throws SQLException {
-    try (Connection connection = dataSource.getConnection()) {
-      // something like this will need to happen in tests on a newly created postgres server,
-      // but likely it will happen in perfkit, not here
-      // alternatively, we may have a pipelineoption indicating whether we want to
-      // re-use the database or create a new one
-      try (Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format("create table %s (id INT, name VARCHAR(500))", tableName));
-      }
-
-      connection.setAutoCommit(false);
-      try (PreparedStatement preparedStatement =
-               connection.prepareStatement(
-                   String.format("insert into %s values (?,?)", tableName))) {
-        for (int i = 0; i < 1000; i++) {
-          int index = i % SCIENTISTS.length;
-          preparedStatement.clearParameters();
-          preparedStatement.setInt(1, i);
-          preparedStatement.setString(2, SCIENTISTS[index]);
-          preparedStatement.executeUpdate();
-        }
-      }
-      connection.commit();
-    }
-
-    LOG.info("Created table {}", tableName);
-  }
-
-  public static void cleanUpDataTable(DataSource dataSource, String tableName)
-      throws SQLException {
-    if (tableName != null) {
-      try (Connection connection = dataSource.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.executeUpdate(String.format("drop table %s", tableName));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
new file mode 100644
index 0000000..fedae51
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.TestRow;
+
+/**
+ * Contains Test helper methods used by both Integration and Unit Tests in
+ * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+class JdbcTestHelper {
+  static String getTableName(String testIdentifier) throws ParseException {
+    SimpleDateFormat formatter = new SimpleDateFormat();
+    formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
+    return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
+  }
+
+  static void createDataTable(
+      DataSource dataSource, String tableName)
+      throws SQLException {
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format("create table %s (id INT, name VARCHAR(500))", tableName));
+      }
+    }
+  }
+
+  static void cleanUpDataTable(DataSource dataSource, String tableName)
+      throws SQLException {
+    if (tableName != null) {
+      try (Connection connection = dataSource.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.executeUpdate(String.format("drop table %s", tableName));
+      }
+    }
+  }
+
+  static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
+    @Override
+    public TestRow mapRow(ResultSet resultSet) throws Exception {
+      return TestRow.create(
+          resultSet.getInt("id"), resultSet.getString("name"));
+    }
+  }
+
+  static class PrepareStatementFromTestRow
+      implements JdbcIO.PreparedStatementSetter<TestRow> {
+    @Override
+    public void setParameters(TestRow element, PreparedStatement statement)
+        throws SQLException {
+      statement.setLong(1, element.id());
+      statement.setString(2, element.name());
+    }
+  }
+
+}


[2/2] beam git commit: This closes #2507: JdbcIOIT now uses writeThenRead style

Posted by ke...@apache.org.
This closes #2507: JdbcIOIT now uses writeThenRead style


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fd2c6e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fd2c6e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fd2c6e1

Branch: refs/heads/master
Commit: 5fd2c6e139387d3bf1a297adaf5dc4687bcda7ee
Parents: 5f972e8 a6201ed
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 13 13:43:36 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 13:43:36 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/common/pom.xml                     |  10 +
 .../org/apache/beam/sdk/io/common/TestRow.java  | 114 +++++++++++
 sdks/java/io/jdbc/pom.xml                       |  10 +-
 .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java   | 203 ++++++++++---------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 115 ++++++-----
 .../beam/sdk/io/jdbc/JdbcTestDataSet.java       | 130 ------------
 .../apache/beam/sdk/io/jdbc/JdbcTestHelper.java |  81 ++++++++
 7 files changed, 377 insertions(+), 286 deletions(-)
----------------------------------------------------------------------