You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/01/18 07:20:53 UTC

[beam] branch master updated: [BEAM-3456] Enable jenkins and large scale scenario in JDBC (#4392)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f235dd  [BEAM-3456] Enable jenkins and large scale scenario in JDBC (#4392)
2f235dd is described below

commit 2f235dd58acce27f713c7072d62cd3a72e3413a1
Author: Ɓukasz Gajowy <lu...@gmail.com>
AuthorDate: Thu Jan 18 08:20:48 2018 +0100

    [BEAM-3456] Enable jenkins and large scale scenario in JDBC (#4392)
    
    [BEAM-3456] Enable jenkins and large scale scenario in JDBC
    
    The kubernetes infrastructure that is needed for the
    Jenkins job to run is not available for now.
    We should add it once the infrastructure is there.
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  6 +++---
 .../org/apache/beam/sdk/io/common/TestRow.java     |  4 +++-
 .../java/org/apache/beam/sdk/io/avro/AvroIOIT.java |  2 +-
 .../beam/sdk/io/common/FileBasedIOITHelper.java    | 10 +++++-----
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  5 ++---
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  |  5 ++---
 sdks/java/io/jdbc/pom.xml                          |  2 ++
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 23 ++++++++++------------
 8 files changed, 28 insertions(+), 29 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index e7b475d..b86020e 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -91,10 +91,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
 
   /* Options for test pipeline for file-based I/O in 'sdks/java/io/file-based-io-tests/'. */
   @Description("Number records that will be written and read by the test")
-  @Default.Long(100000)
-  Long getNumberOfRecords();
+  @Default.Integer(100000)
+  Integer getNumberOfRecords();
 
-  void setNumberOfRecords(Long count);
+  void setNumberOfRecords(Integer count);
 
   @Description("Destination prefix for files generated by the test")
   @Validation.Required
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index 5f0a2fb..79a144d 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -95,7 +95,9 @@ public abstract class TestRow implements Serializable, Comparable<TestRow> {
    * the name() for the rows generated from seeds in [0, n).
    */
   private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
-      1000, "7d94d63a41164be058a9680002914358"
+      1000, "7d94d63a41164be058a9680002914358",
+      100_000, "c7cbddb319209e200f1c5eebef8fe960",
+      5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
   );
 
   /**
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index be0d6df..07562f3 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -75,7 +75,7 @@ public class AvroIOIT {
       + "}");
 
   private static String filenamePrefix;
-  private static Long numberOfTextLines;
+  private static Integer numberOfTextLines;
 
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index cf20d8e..40b0461 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -55,11 +55,11 @@ public class FileBasedIOITHelper {
     return String.format("%s_%s", filenamePrefix, new Date().getTime());
   }
 
-  public static String getExpectedHashForLineCount(Long lineCount) {
-    Map<Long, String> expectedHashes = ImmutableMap.of(
-        100_000L, "4c8bb3b99dcc59459b20fefba400d446",
-        1_000_000L, "9796db06e7a7960f974d5a91164afff1",
-        100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
+  public static String getExpectedHashForLineCount(int lineCount) {
+    Map<Integer, String> expectedHashes = ImmutableMap.of(
+        100_000, "4c8bb3b99dcc59459b20fefba400d446",
+        1_000_000, "9796db06e7a7960f974d5a91164afff1",
+        100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
     );
 
     String hash = expectedHashes.get(lineCount);
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 1a4eccc..b611a57 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampT
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
-import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
@@ -65,14 +64,14 @@ import org.junit.runners.JUnit4;
 public class TextIOIT {
 
   private static String filenamePrefix;
-  private static Long numberOfTextLines;
+  private static Integer numberOfTextLines;
   private static Compression compressionType;
 
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
-  public static void setup() throws ParseException {
+  public static void setup() {
     IOTestPipelineOptions options = readTestPipelineOptions();
 
     numberOfTextLines = options.getNumberOfRecords();
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 3f08d76..2908c8c 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -23,7 +23,6 @@ import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampT
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
-import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TFRecordIO;
@@ -67,7 +66,7 @@ import org.junit.runners.JUnit4;
 public class TFRecordIOIT {
 
   private static String filenamePrefix;
-  private static Long numberOfTextLines;
+  private static Integer numberOfTextLines;
   private static Compression compressionType;
 
   @Rule
@@ -77,7 +76,7 @@ public class TFRecordIOIT {
   public TestPipeline readPipeline = TestPipeline.create();
 
   @BeforeClass
-  public static void setup() throws ParseException {
+  public static void setup() {
     IOTestPipelineOptions options = readTestPipelineOptions();
 
     numberOfTextLines = options.getNumberOfRecords();
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index e6bb357..73fbc52 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -122,6 +122,7 @@
               <executable>${python.interpreter.bin}</executable>
               <arguments>
                 <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
                 <argument>-benchmarks=beam_integration_benchmark</argument>
                 <argument>-beam_it_profile=io-it</argument>
                 <argument>-beam_location=${beamRootProjectDir}</argument>
@@ -204,6 +205,7 @@
               <executable>${python.interpreter.bin}</executable>
               <arguments>
                 <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
                 <argument>-benchmarks=beam_integration_benchmark</argument>
                 <argument>-beam_it_profile=io-it</argument>
                 <argument>-beam_location=${beamRootProjectDir}</argument>
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 32d6d9e..941a775 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -41,9 +41,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.postgresql.ds.PGSimpleDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 /**
  * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance.
@@ -56,7 +53,8 @@ import org.slf4j.LoggerFactory;
  *  "--postgresUsername=postgres",
  *  "--postgresDatabaseName=myfancydb",
  *  "--postgresPassword=mypass",
- *  "--postgresSsl=false" ]'
+ *  "--postgresSsl=false",
+ *  "--numberOfRecords=1000" ]'
  * </pre>
  *
  * <p>If you want to run this with a runner besides directrunner, there are profiles for dataflow
@@ -65,8 +63,8 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(JUnit4.class)
 public class JdbcIOIT {
-  private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
-  public static final int EXPECTED_ROW_COUNT = 1000;
+
+  private static int numberOfRows;
   private static PGSimpleDataSource dataSource;
   private static String tableName;
 
@@ -81,14 +79,14 @@ public class JdbcIOIT {
     IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
         .as(IOTestPipelineOptions.class);
 
+    numberOfRows = options.getNumberOfRecords();
     dataSource = getDataSource(options);
 
     tableName = JdbcTestHelper.getTableName("IT");
     JdbcTestHelper.createDataTable(dataSource, tableName);
   }
 
-  private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options)
-      throws SQLException {
+  private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) {
     PGSimpleDataSource dataSource = new PGSimpleDataSource();
 
     dataSource.setDatabaseName(options.getPostgresDatabaseName());
@@ -124,7 +122,7 @@ public class JdbcIOIT {
    * the database.)
    */
   private void runWrite() {
-    pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT))
+    pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows))
         .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
         .apply(JdbcIO.<TestRow>write()
             .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
@@ -162,13 +160,13 @@ public class JdbcIOIT {
 
     PAssert.thatSingleton(
         namesAndIds.apply("Count All", Count.<TestRow>globally()))
-        .isEqualTo((long) EXPECTED_ROW_COUNT);
+        .isEqualTo((long) numberOfRows);
 
     PCollection<String> consolidatedHashcode = namesAndIds
         .apply(ParDo.of(new TestRow.SelectNameFn()))
         .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
     PAssert.that(consolidatedHashcode)
-        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT));
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
 
     PCollection<List<TestRow>> frontOfList =
         namesAndIds.apply(Top.<TestRow>smallest(500));
@@ -178,8 +176,7 @@ public class JdbcIOIT {
     PCollection<List<TestRow>> backOfList =
         namesAndIds.apply(Top.<TestRow>largest(500));
     Iterable<TestRow> expectedBackOfList =
-        TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500,
-            EXPECTED_ROW_COUNT);
+        TestRow.getExpectedValues(numberOfRows - 500, numberOfRows);
     PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);
 
     pipelineRead.run().waitUntilFinish();

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].